diff --git a/pom.xml b/pom.xml index b55e86a..0cd4238 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,7 @@ 4.0.4 0.5.2 2.2.0 - 1.4.0 + 1.5.0 2.10 2.10.4 1.1 @@ -222,16 +222,6 @@ false - - spark-1.3 - https://s3-us-west-1.amazonaws.com/hive-spark/maven2/spark_2.10-1.3-rc1/ - - true - - - false - - diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index 51772cd..52f4b9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -23,29 +23,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.spark.JavaSparkListener; import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -public class JobMetricsListener implements SparkListener { +public class JobMetricsListener extends JavaSparkListener { private static final Log LOG = LogFactory.getLog(JobMetricsListener.class); @@ -54,36 +40,6 @@ private final Map>> allJobMetrics = Maps.newHashMap(); @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved removed) { - - } - - @Override - public void onExecutorAdded(SparkListenerExecutorAdded added) { - - } - - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - - } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - - } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { - - } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { - - } - - @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { int stageId = taskEnd.stageId(); int stageAttemptId = taskEnd.stageAttemptId(); @@ -119,46 +75,6 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobIdToStageId.put(jobId, intStageIds); } - @Override - public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { - - } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { - - } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { - - } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { - - } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { - - } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { - - } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - - } - - @Override - public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - - } - public synchronized Map> getJobMetric(int jobId) { return allJobMetrics.get(jobId); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index b77c9e8..f5b1e48 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -43,26 +43,13 @@ import org.apache.hive.spark.client.rpc.Rpc; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.counter.SparkCounters; +import org.apache.spark.JavaSparkListener; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -438,21 +425,11 @@ private void monitorJob(JavaFutureAction job, } - private class ClientListener implements SparkListener { + private class ClientListener extends JavaSparkListener { private final Map stageToJobId = Maps.newHashMap(); @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved removed) { - - } - - @Override - public void onExecutorAdded(SparkListenerExecutorAdded added) { - - } - - @Override public void onJobStart(SparkListenerJobStart jobStart) { synchronized (stageToJobId) { for (int i = 0; i < jobStart.stageIds().length(); i++) { @@ -500,39 +477,6 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { } } - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { } - - @Override - public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { } - /** * Returns the client job ID for the given Spark job ID. *