commit c5347ba933ef1212aec32d95a4c10911ddc8c0d8 Author: Sahil Takiar Date: Fri Feb 9 14:46:09 2018 -0800 HIVE-18677: SparkClientImpl usage of SessionState.LogHelper doesn't respect isSilent value diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 93d44dc24c..c571d1a41b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -101,8 +101,7 @@ } private void createRemoteClient() throws Exception { - remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId, - SessionState.LogHelper.getInfoStream()); + remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId); if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) && (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) || diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 943a4ee00a..913c9d7bf1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; +import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -304,4 +306,11 @@ private static boolean isDirectDPPBranch(Operator op) { } return false; } + + public static String reverseDNSLookupURL(String url) throws UnknownHostException { + // Run a reverse DNS lookup on the URL + URI uri = URI.create(url); + InetAddress address = InetAddress.getByName(uri.getHost()); + return uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + uri.getPort(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 22f70243e6..3a97a2d0fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -93,6 +93,7 @@ public int startMonitor() { if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); printAppInfo(); + console.printInfo("Hive on Spark Session Web UI URL: " + sparkJobStatus.getWebUIURL()); // print job stages. console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + "] stages: " + Arrays.toString(sparkJobStatus.getStageIds())); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index c2c8fb4a1a..8474afcc2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -43,6 +43,8 @@ SparkStatistics getSparkStatistics(); + String getWebUIURL(); + void cleanup(); Throwable getError(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 3e84175b69..8b031e7d26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; @@ -151,6 +153,20 @@ public SparkStatistics getSparkStatistics() { return sparkStatisticsBuilder.build(); } + @Override + public String getWebUIURL() { + try { + if (sparkContext.sc().uiWebUrl().isDefined()) { + return SparkUtilities.reverseDNSLookupURL(sparkContext.sc().uiWebUrl().get()); + } else { + return "UNDEFINED"; + } + } catch (Exception e) { + LOG.warn("Failed to get web UI URL.", e); + } + return "UNKNOWN"; + } + @Override public void cleanup() { jobMetricsListener.cleanup(jobId); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ce07a9fadd..e950452c31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -38,6 +39,8 @@ import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; +import java.net.InetAddress; +import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -137,6 +140,20 @@ public SparkStatistics getSparkStatistics() { return sparkStatisticsBuilder.build(); } + @Override + public String getWebUIURL() { + Future getWebUIURL = sparkClient.run(new GetWebUIURLJob()); + try { + return getWebUIURL.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Failed to get web UI URL.", e); + if (Thread.interrupted()) { + error = e; + } + return "UNKNOWN"; + } + } + @Override public void cleanup() { @@ -287,4 +304,18 @@ public String call(JobContext jc) throws Exception { return jc.sc().sc().applicationId(); } } + + private static class GetWebUIURLJob implements Job { + + public GetWebUIURLJob() { + } + + @Override + public String call(JobContext jc) throws Exception { + if (jc.sc().sc().uiWebUrl().isDefined()) { + return SparkUtilities.reverseDNSLookupURL(jc.sc().sc().uiWebUrl().get()); + } + return "UNDEFINED"; + } + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 729080936b..7ca89ed275 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -165,17 +165,4 @@ } } - - protected static class SparkUIWebURL implements Serializable { - - final String UIWebURL; - - SparkUIWebURL(String UIWebURL) { - this.UIWebURL = UIWebURL; - } - - SparkUIWebURL() { - this(null); - } - } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 66cf33942c..e584cbb0a7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -171,14 +171,6 @@ public void rpcClosed(Rpc rpc) { throw e; } - if (jc.sc().sc().uiWebUrl().isDefined()) { - // Run a reverse DNS lookup on the URL - URI uri = URI.create(jc.sc().sc().uiWebUrl().get()); - InetAddress address = InetAddress.getByName(uri.getHost()); - this.protocol.sendUIWebURL(uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + - uri.getPort()); - } - synchronized (jcLock) { for (Iterator> it = jobQueue.iterator(); it.hasNext();) { it.next().submit(); @@ -280,11 +272,6 @@ void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); } - void sendUIWebURL(String UIWebURL) { - LOG.debug("Send UIWebURL({}) to Client.", UIWebURL); - clientRpc.call(new SparkUIWebURL(UIWebURL)); - } - private void handle(ChannelHandlerContext ctx, CancelJob msg) { JobWrapper job = activeJobs.get(msg.id); if (job == null || !cancelJob(job)) { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 6dfc1a5807..fd9b72583a 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -82,10 +82,10 @@ public static void stop() { * @param hiveConf Configuration for Hive, contains hive.* properties. */ public static SparkClient createClient(Map sparkConf, HiveConf hiveConf, - String sessionId, PrintStream consoleStream) + String sessionId) throws IOException, SparkException { Preconditions.checkState(server != null, "initialize() not called."); - return new SparkClientImpl(server, sparkConf, hiveConf, sessionId, consoleStream); + return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 78317ed6d5..214f74bdfe 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -89,15 +89,13 @@ private final Map> jobs; private final Rpc driverRpc; private final ClientProtocol protocol; - private final PrintStream consoleStream; private volatile boolean isAlive; SparkClientImpl(RpcServer rpcServer, Map conf, HiveConf hiveConf, - String sessionid, PrintStream consoleStream) throws IOException { + String sessionid) throws IOException { this.conf = conf; this.hiveConf = hiveConf; this.jobs = Maps.newConcurrentMap(); - this.consoleStream = consoleStream; String secret = rpcServer.createSecret(); this.driverThread = startDriver(rpcServer, sessionid, secret); @@ -623,12 +621,6 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId); } } - - private void handle(ChannelHandlerContext ctx, SparkUIWebURL msg) { - String printMsg = "Hive on Spark Session Web UI URL: " + msg.UIWebURL; - consoleStream.println(printMsg); - LOG.info(printMsg); - } } private static class AddJarJob implements Job { diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index e66caee6e5..fb31c933ca 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -303,8 +303,7 @@ private void runTest(TestFunction test) throws Exception { SparkClient client = null; try { test.config(conf); - client = SparkClientFactory.createClient(conf, HIVECONF, UUID.randomUUID().toString(), - mock(PrintStream.class)); + client = SparkClientFactory.createClient(conf, HIVECONF, UUID.randomUUID().toString()); test.call(client); } finally { if (client != null) {