commit 56d3599b7287891a7ad9e8fbf460b56685642415 Author: Sahil Takiar Date: Thu Oct 19 08:14:18 2017 -0700 HIVE-17838: Make org.apache.hive.spark.client.rpc logging HoS specific and other logging cleanup diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 99df96736d..8baf309e7f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -495,8 +495,8 @@ FILE_NOT_FOUND(20012, "File not found: {0}", "64000", true), WRONG_FILE_FORMAT(20013, "Wrong file format. Please check the file's format.", "64000", true), - SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark job is submitted to an invalid queue: {0}." - + " Please fix and try again.", true), + SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark app for session {0} was submitted to an invalid" + + " queue: {1}. Please fix and try again.", true), SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."), // An exception from runtime that will show the full stack to client @@ -574,13 +574,13 @@ SPARK_CREATE_CLIENT_TIMEOUT(30038, "Timed out while creating Spark client for session {0}.", true), SPARK_CREATE_CLIENT_QUEUE_FULL(30039, - "Failed to create Spark client because job queue is full: {0}.", true), + "Failed to create Spark client for session {0} because job queue is full: {1}.", true), SPARK_CREATE_CLIENT_INTERRUPTED(30040, "Interrupted while creating Spark client for session {0}", true), SPARK_CREATE_CLIENT_ERROR(30041, "Failed to create Spark client for Spark session {0}: {1}", true), SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042, - "Failed to create Spark client due to invalid resource request: {0}", true), + "Failed to create Spark client for session {0} due to invalid resource request: {1}", true), SPARK_CREATE_CLIENT_CLOSED_SESSION(30043, "Cannot create Spark client on a closed session {0}", true), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 565c43b9c2..5ed5d4214e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -104,7 +104,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se inputStream = HiveSparkClientFactory.class.getClassLoader() .getResourceAsStream(SPARK_DEFAULT_CONF_FILE); if (inputStream != null) { - LOG.info("loading spark properties from: " + SPARK_DEFAULT_CONF_FILE); + LOG.info("Loading Spark properties from: " + SPARK_DEFAULT_CONF_FILE); Properties properties = new Properties(); properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8)); for (String propertyName : properties.stringPropertyNames()) { @@ -118,7 +118,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se } } } catch (IOException e) { - LOG.info("Failed to open spark configuration file: " + LOG.info("Failed to open Spark configuration file: " + SPARK_DEFAULT_CONF_FILE, e); } finally { if (inputStream != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index bfa2da68e1..803877162d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -130,7 +130,7 @@ public int execute(DriverContext driverContext) { if (driverContext.isShutdown()) { LOG.warn("Killing Spark job"); killJob(); - throw new HiveException("Operation is cancelled."); + throw new HiveException(String.format("Spark task %s cancelled for query %s", getId(), sparkWork.getQueryId())); } // Get the Job Handle id associated with the Spark job @@ -176,7 +176,7 @@ public int execute(DriverContext driverContext) { ? "UNKNOWN" : jobID)); killJob(); } else if (rc == 4) { - LOG.info("The spark job or one stage of it has too many tasks" + + LOG.info("The Spark job or one stage of it has too many tasks" + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); killJob(); } @@ -186,7 +186,7 @@ public int execute(DriverContext driverContext) { } sparkJobStatus.cleanup(); } catch (Exception e) { - String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; + String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'"; // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 189de1930d..c8cb1ac08c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -79,7 +79,7 @@ public SparkSessionImpl() { @Override public void open(HiveConf conf) throws HiveException { - LOG.info("Trying to open Spark session {}", sessionId); + LOG.info("Trying to open Hive on Spark session {}", sessionId); this.conf = conf; isOpen = true; try { @@ -94,12 +94,12 @@ public void open(HiveConf conf) throws HiveException { } throw he; } - LOG.info("Spark session {} is successfully opened", sessionId); + LOG.info("Hive on Spark session {} successfully opened", sessionId); } @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { - Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); + Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); return hiveSparkClient.execute(driverContext, sparkWork); } @@ -129,9 +129,9 @@ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) thro totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); long memoryPerTaskInBytes = totalMemory / totalCores; - LOG.info("Spark cluster current has executors: " + numExecutors + LOG.info("Hive on Spark application currently has number of executors: " + numExecutors + ", total cores: " + totalCores + ", memory per executor: " - + executorMemoryInMB + "M, memoryFraction: " + memoryFraction); + + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); return new ObjectPair(Long.valueOf(memoryPerTaskInBytes), Integer.valueOf(totalCores)); } @@ -153,15 +153,15 @@ public String getSessionId() { @Override public void close() { - LOG.info("Trying to close Spark session {}", sessionId); + LOG.info("Trying to close Hive on Spark session {}", sessionId); isOpen = false; if (hiveSparkClient != null) { try { hiveSparkClient.close(); - LOG.info("Spark session {} is successfully closed", sessionId); + LOG.info("Hive on Spark session {} successfully closed", sessionId); cleanScratchDir(); } catch (IOException e) { - LOG.error("Failed to close spark session (" + sessionId + ").", e); + LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); } } hiveSparkClient = null; @@ -197,20 +197,22 @@ HiveException getHiveException(Throwable e) { StringBuilder matchedString = new StringBuilder(); while (e != null) { if (e instanceof TimeoutException) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId); } else if (e instanceof InterruptedException) { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED, sessionId); } else if (e instanceof RuntimeException) { String sts = Throwables.getStackTraceAsString(e); if (matches(sts, AM_TIMEOUT_ERR, matchedString)) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId); } else if (matches(sts, UNKNOWN_QUEUE_ERR, matchedString) || matches(sts, STOPPED_QUEUE_ERR, matchedString)) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString.toString()); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, sessionId, + matchedString.toString()); } else if (matches(sts, FULL_QUEUE_ERR, matchedString)) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, matchedString.toString()); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, sessionId, + matchedString.toString()); } else if (matches(sts, INVALILD_MEM_ERR, matchedString) || matches(sts, INVALID_CORE_ERR, matchedString)) { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, - matchedString.toString()); + sessionId, matchedString.toString()); } else { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index eecb103de0..ab87c79c48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -264,9 +264,9 @@ private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws Sem context.getConf(), sparkSessionManager); sparkMemoryAndCores = sparkSession.getMemoryAndCores(); } catch (HiveException e) { - throw new SemanticException("Failed to get a spark session: " + e); + throw new SemanticException("Failed to get a Hive on Spark session", e); } catch (Exception e) { - LOG.warn("Failed to get spark memory/core info", e); + LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e); } finally { if (sparkSession != null && sparkSessionManager != null) { try { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 6a988a49ea..558ed80ee8 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -38,10 +38,20 @@ this(null); } + @Override + public String toString() { + return "CancelJob{" + + "id='" + id + '\'' + + '}'; + } } protected static class EndSession implements Serializable { + @Override + public String toString() { + return "EndSession"; + } } protected static class Error implements Serializable { @@ -56,6 +66,12 @@ this(null); } + @Override + public String toString() { + return "Error{" + + "cause='" + cause + '\'' + + '}'; + } } protected static class JobMetrics implements Serializable { @@ -78,6 +94,16 @@ this(null, -1, -1, -1, null); } + @Override + public String toString() { + return "JobMetrics{" + + "jobId='" + jobId + '\'' + + ", sparkJobId=" + sparkJobId + + ", stageId=" + stageId + + ", taskId=" + taskId + + ", metrics=" + metrics + + '}'; + } } protected static class JobRequest implements Serializable { @@ -94,6 +120,13 @@ this(null, null); } + @Override + public String toString() { + return "JobRequest{" + + "id='" + id + '\'' + + ", job=" + job + + '}'; + } } public static class JobResult implements Serializable { @@ -137,6 +170,12 @@ public String toString() { this(null); } + @Override + public String toString() { + return "JobStarted{" + + "id='" + id + '\'' + + '}'; + } } /** @@ -154,6 +193,14 @@ public String toString() { JobSubmitted() { this(null, -1); } + + @Override + public String toString() { + return "JobSubmitted{" + + "clientJobId='" + clientJobId + '\'' + + ", sparkJobId=" + sparkJobId + + '}'; + } } protected static class SyncJobRequest implements Serializable { @@ -168,5 +215,11 @@ public String toString() { this(null); } + @Override + public String toString() { + return "SyncJobRequest{" + + "job=" + job + + '}'; + } } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 6e546d4548..caa850c08c 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -25,8 +25,6 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.net.InetAddress; -import java.net.URI; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -118,17 +116,18 @@ private RemoteDriver(String[] args) throws Exception { // as these are non-spark specific configs used by the remote driver mapConf.put(val[0], val[1]); } else { - throw new IllegalArgumentException("Invalid command line: " + Joiner.on(" ").join(args)); + throw new IllegalArgumentException("Invalid command line arguments: " + + Joiner.on(" ").join(args)); } } executor = Executors.newCachedThreadPool(); - LOG.info("Connecting to: {}:{}", serverAddress, serverPort); + LOG.info("Connecting to HiveServer2 address: {}:{}", serverAddress, serverPort); for (Tuple2 e : conf.getAll()) { mapConf.put(e._1(), e._2()); - LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2()); + LOG.debug("Remote Spark Driver configured with: " + e._1() + "=" + e._2()); } String clientId = mapConf.get(SparkClientFactory.CONF_CLIENT_ID); @@ -140,7 +139,7 @@ private RemoteDriver(String[] args) throws Exception { this.egroup = new NioEventLoopGroup( threadCount, new ThreadFactoryBuilder() - .setNameFormat("Driver-RPC-Handler-%d") + .setNameFormat("Spark-Driver-RPC-Handler-%d") .setDaemon(true) .build()); this.protocol = new DriverProtocol(); @@ -153,9 +152,14 @@ private RemoteDriver(String[] args) throws Exception { this.clientRpc.addListener(new Rpc.Listener() { @Override public void rpcClosed(Rpc rpc) { - LOG.warn("Shutting down driver because RPC channel was closed."); + LOG.warn("Shutting down driver because Remote Spark Driver to HiveServer2 connection was closed."); shutdown(null); } + + @Override + public String toString() { + return "Shutting Down Remote Spark Driver to HiveServer2 Connection"; + } }); try { @@ -211,7 +215,7 @@ private void submit(JobWrapper job) { if (jc != null) { job.submit(); } else { - LOG.info("SparkContext not yet up, queueing job request."); + LOG.info("SparkContext not yet up; adding Hive on Spark job request to the queue."); jobQueue.add(job); } } @@ -220,9 +224,9 @@ private void submit(JobWrapper job) { private synchronized void shutdown(Throwable error) { if (running) { if (error == null) { - LOG.info("Shutting down remote driver."); + LOG.info("Shutting down Spark Remote Driver."); } else { - LOG.error("Shutting down remote driver due to error: " + error, error); + LOG.error("Shutting down Spark Remote Driver due to error: " + error, error); } running = false; for (JobWrapper job : activeJobs.values()) { @@ -253,7 +257,7 @@ private boolean cancelJob(JobWrapper job) { private String getArg(String[] args, int keyIdx) { int valIdx = keyIdx + 1; if (args.length <= valIdx) { - throw new IllegalArgumentException("Invalid command line: " + throw new IllegalArgumentException("Invalid command line arguments: " + Joiner.on(" ").join(args)); } return args[valIdx]; @@ -294,7 +298,7 @@ void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics private void handle(ChannelHandlerContext ctx, CancelJob msg) { JobWrapper job = activeJobs.get(msg.id); if (job == null || !cancelJob(job)) { - LOG.info("Requested to cancel an already finished job."); + LOG.info("Requested to cancel an already finished client job."); } } @@ -304,7 +308,7 @@ private void handle(ChannelHandlerContext ctx, EndSession msg) { } private void handle(ChannelHandlerContext ctx, JobRequest msg) { - LOG.info("Received job request {}", msg.id); + LOG.debug("Received client job request {}", msg.id); JobWrapper wrapper = new JobWrapper(msg); activeJobs.put(msg.id, wrapper); submit(wrapper); @@ -318,7 +322,7 @@ private Object handle(ChannelHandlerContext ctx, SyncJobRequest msg) throws Exce while (jc == null) { jcLock.wait(); if (!running) { - throw new IllegalStateException("Remote context is shutting down."); + throw new IllegalStateException("Remote Spark context is shutting down."); } } } @@ -339,6 +343,10 @@ public void call(JavaFutureAction future, } } + @Override + public String name() { + return "Remote Spark Driver to HiveServer2 Connection"; + } } private class JobWrapper implements Callable { @@ -404,12 +412,13 @@ public void call(JavaFutureAction future, if (sparkCounters != null) { counters = sparkCounters.snapshot(); } + protocol.jobFinished(req.id, result, null, counters); } catch (Throwable t) { // Catch throwables in a best-effort to report job status back to the client. It's // re-thrown so that the executor can destroy the affected thread (or the JVM can // die or whatever would happen if the throwable bubbled up). - LOG.error("Failed to run job " + req.id, t); + LOG.error("Failed to run client job " + req.id, t); protocol.jobFinished(req.id, null, t, sparkCounters != null ? sparkCounters.snapshot() : null); throw new ExecutionException(t); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index fd9b72583a..88b5c9513f 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -84,8 +84,8 @@ public static void stop() { public static SparkClient createClient(Map sparkConf, HiveConf hiveConf, String sessionId) throws IOException, SparkException { - Preconditions.checkState(server != null, "initialize() not called."); + Preconditions.checkState(server != null, + "Invalid state: Hive on Spark RPC Server has not been initialized"); return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index d450515359..f8b5d19e7a 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -43,6 +43,7 @@ import java.net.URI; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -103,17 +104,16 @@ // The RPC server will take care of timeouts here. this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get(); } catch (Throwable e) { - String errorMsg = null; + String errorMsg; if (e.getCause() instanceof TimeoutException) { - errorMsg = "Timed out waiting for client to connect.\nPossible reasons include network " + - "issues, errors in remote driver or the cluster has no available resources, etc." + + errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " + + "include network issues, errors in remote driver, cluster has no available resources, etc." + "\nPlease check YARN or Spark driver's logs for further information."; } else if (e.getCause() instanceof InterruptedException) { - errorMsg = "Interruption occurred while waiting for client to connect.\nPossibly the Spark session is closed " + - "such as in case of query cancellation." + - "\nPlease refer to HiveServer2 logs for further information."; + errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " + + "that the query was cancelled which would cause the Spark Session to close."; } else { - errorMsg = "Error while waiting for client to connect."; + errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; } LOG.error(errorMsg, e); driverThread.interrupt(); @@ -126,14 +126,21 @@ throw Throwables.propagate(e); } + LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); + driverRpc.addListener(new Rpc.Listener() { @Override public void rpcClosed(Rpc rpc) { if (isAlive) { - LOG.warn("Client RPC channel closed unexpectedly."); + LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress()); isAlive = false; } } + + @Override + public String toString() { + return "Connection to Remote Spark Driver Closed Unexpectedly"; + } }); isAlive = true; } @@ -256,7 +263,7 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin try { URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); if (sparkDefaultsUrl != null) { - LOG.info("Loading spark defaults: " + sparkDefaultsUrl); + LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl); allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); } } catch (Exception e) { @@ -574,7 +581,7 @@ void cancel(String jobId) { } private void handle(ChannelHandlerContext ctx, Error msg) { - LOG.warn("Error reported from remote driver: {}", msg.cause); + LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause); } private void handle(ChannelHandlerContext ctx, JobMetrics msg) { @@ -582,14 +589,14 @@ private void handle(ChannelHandlerContext ctx, JobMetrics msg) { if (handle != null) { handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics); } else { - LOG.warn("Received metrics for unknown job {}", msg.jobId); + LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId); } } private void handle(ChannelHandlerContext ctx, JobResult msg) { JobHandleImpl handle = jobs.remove(msg.id); if (handle != null) { - LOG.info("Received result for {}", msg.id); + LOG.debug("Received result for client job {}", msg.id); handle.setSparkCounters(msg.sparkCounters); Throwable error = msg.error; if (error == null) { @@ -598,7 +605,7 @@ private void handle(ChannelHandlerContext ctx, JobResult msg) { handle.setFailure(error); } } else { - LOG.warn("Received result for unknown job {}", msg.id); + LOG.warn("Received result for unknown client job {}", msg.id); } } @@ -607,19 +614,24 @@ private void handle(ChannelHandlerContext ctx, JobStarted msg) { if (handle != null) { handle.changeState(JobHandle.State.STARTED); } else { - LOG.warn("Received event for unknown job {}", msg.id); + LOG.warn("Received event for unknown client job {}", msg.id); } } private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { JobHandleImpl handle = jobs.get(msg.clientJobId); if (handle != null) { - LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId); + LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId); handle.addSparkJobId(msg.sparkJobId); } else { - LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId); + LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId); } } + + @Override + protected String name() { + return "HiveServer2 to Remote Spark Driver Connection"; + } } private static class AddJarJob implements Job { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java index f137007b40..6a13071345 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java @@ -44,4 +44,10 @@ public InputMetrics(TaskMetrics metrics) { this(metrics.inputMetrics().bytesRead()); } + @Override + public String toString() { + return "InputMetrics{" + + "bytesRead=" + bytesRead + + '}'; + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index b718b3bd95..cf7a1f6c28 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -127,4 +127,22 @@ private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metri return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null; } + @Override + public String toString() { + return "Metrics{" + + "executorDeserializeTime=" + executorDeserializeTime + + ", executorDeserializeCpuTime=" + executorDeserializeCpuTime + + ", executorRunTime=" + executorRunTime + + ", executorCpuTime=" + executorCpuTime + + ", resultSize=" + resultSize + + ", jvmGCTime=" + jvmGCTime + + ", resultSerializationTime=" + resultSerializationTime + + ", memoryBytesSpilled=" + memoryBytesSpilled + + ", diskBytesSpilled=" + diskBytesSpilled + + ", taskDurationTime=" + taskDurationTime + + ", inputMetrics=" + inputMetrics + + ", shuffleReadMetrics=" + shuffleReadMetrics + + ", shuffleWriteMetrics=" + shuffleWriteMetrics + + '}'; + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java index 9ff4d0ff0f..e3d564f576 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java @@ -73,4 +73,13 @@ public long getTotalBlocksFetched() { return remoteBlocksFetched + localBlocksFetched; } + @Override + public String toString() { + return "ShuffleReadMetrics{" + + "remoteBlocksFetched=" + remoteBlocksFetched + + ", localBlocksFetched=" + localBlocksFetched + + ", fetchWaitTime=" + fetchWaitTime + + ", remoteBytesRead=" + remoteBytesRead + + '}'; + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java index 64a4b86042..e9cf6a1d9a 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java @@ -51,4 +51,11 @@ public ShuffleWriteMetrics(TaskMetrics metrics) { metrics.shuffleWriteMetrics().shuffleWriteTime()); } + @Override + public String toString() { + return "ShuffleWriteMetrics{" + + "shuffleBytesWritten=" + shuffleBytesWritten + + ", shuffleWriteTime=" + shuffleWriteTime + + '}'; + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java index 5454ec2a54..d3a68125c7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java @@ -101,7 +101,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) Input kryoIn = new Input(new ByteBufferInputStream(nioBuffer)); Object msg = kryos.get().readClassAndObject(kryoIn); - LOG.debug("Decoded message of type {} ({} bytes)", + LOG.trace("Decoded message of type {} ({} bytes)", msg != null ? msg.getClass().getName() : msg, msgSize); out.add(msg); } finally { @@ -118,7 +118,7 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) kryoOut.flush(); byte[] msgData = maybeEncrypt(bytes.toByteArray()); - LOG.debug("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length); + LOG.trace("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length); checkSize(msgData.length); buf.ensureWritable(msgData.length + 4); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java index cbbfb1ca64..298a210bf1 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -116,7 +117,7 @@ final Runnable timeoutTask = new Runnable() { @Override public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection.")); + promise.setFailure(new TimeoutException("Timed out waiting to connect to HiveServer2.")); } }; final ScheduledFuture timeoutFuture = eloop.schedule(timeoutTask, @@ -272,7 +273,8 @@ public boolean isActive() { */ public Future call(final Object msg, Class retType) { Preconditions.checkArgument(msg != null); - Preconditions.checkState(channel.isActive(), "RPC channel is closed."); + Preconditions.checkState(channel.isActive(), "Unable to send message " + msg + + " because the Remote Spark Driver - HiveServer2 connection has been closed."); try { final long id = rpcId.getAndIncrement(); final Promise promise = createPromise(); @@ -280,7 +282,8 @@ public boolean isActive() { @Override public void operationComplete(ChannelFuture cf) { if (!cf.isSuccess() && !promise.isDone()) { - LOG.warn("Failed to send RPC, closing connection.", cf.cause()); + LOG.warn("Failed to send message '" + msg + "', closing Remote Spark Driver - " + + "HiveServer2 connection.", cf.cause()); promise.setFailure(cf.cause()); dispatcher.discardRpc(id); close(); @@ -314,6 +317,14 @@ Channel getChannel() { return channel; } + /** + * Returns the "hostname:port" that the RPC is connected to + */ + public String getRemoteAddress() { + InetSocketAddress remoteAddress = ((InetSocketAddress) this.channel.remoteAddress()); + return remoteAddress.getHostName() + ":" + remoteAddress.getPort(); + } + void setDispatcher(RpcDispatcher dispatcher) { Preconditions.checkNotNull(dispatcher); Preconditions.checkState(this.dispatcher == null); @@ -336,7 +347,7 @@ public void close() { try { l.rpcClosed(this); } catch (Exception e) { - LOG.warn("Error caught in Rpc.Listener invocation.", e); + LOG.warn("Error caught while running '" + l + "' listener", e); } } } @@ -493,12 +504,10 @@ void sendHello(Channel c) throws Exception { client.evaluateChallenge(new byte[0]) : new byte[0]; c.writeAndFlush(new SaslMessage(clientId, hello)).addListener(future -> { if (!future.isSuccess()) { - LOG.error("Failed to send hello to server", future.cause()); + LOG.error("Failed to send test message to HiveServer2", future.cause()); onError(future.cause()); } }); } - } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index a535b8d333..090c628001 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -115,10 +115,9 @@ String getServerAddress() throws IOException { * Parses the port string like 49152-49222,49228 into the port list. A default 0 * is added for the empty port string. * @return a list of configured ports. - * @exception IOException is thrown if the property is not configured properly */ - List getServerPorts() throws IOException { - String errMsg = "Incorrect RPC server port configuration for HiveServer2"; + List getServerPorts() { + String errMsg = "Malformed configuration value for " + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname; String portString = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname); ArrayList ports = new ArrayList(); try { @@ -127,7 +126,7 @@ String getServerAddress() throws IOException { String[] range = portRange.split("-"); if (range.length == 0 || range.length > 2 || (range.length == 2 && Integer.valueOf(range[0]) > Integer.valueOf(range[1]))) { - throw new IOException(errMsg); + throw new IllegalArgumentException(errMsg); } if (range.length == 1) { ports.add(Integer.valueOf(range[0])); @@ -143,7 +142,7 @@ String getServerAddress() throws IOException { return ports; } catch(NumberFormatException e) { - throw new IOException(errMsg); + throw new IllegalArgumentException(errMsg, e); } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java index 00f5a17412..b5885477fc 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java @@ -66,13 +66,12 @@ protected String name() { protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (lastHeader == null) { if (!(msg instanceof Rpc.MessageHeader)) { - LOG.warn("[{}] Expected RPC header, got {} instead.", name(), - msg != null ? msg.getClass().getName() : null); - throw new IllegalArgumentException(); + throw new IllegalArgumentException(String.format("[%s] Expected RPC header, got %s instead.", name(), + msg != null ? msg.getClass().getName() : null)); } lastHeader = (Rpc.MessageHeader) msg; } else { - LOG.debug("[{}] Received RPC message: type={} id={} payload={}", name(), + LOG.trace("[{}] Received RPC message: type={} id={} payload={}", name(), lastHeader.type, lastHeader.id, msg != null ? msg.getClass().getName() : null); try { switch (lastHeader.type) { @@ -86,7 +85,7 @@ protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws handleError(ctx, msg, findRpc(lastHeader.id)); break; default: - throw new IllegalArgumentException("Unknown RPC message type: " + lastHeader.type); + throw new IllegalArgumentException("[" + name() + "] Unknown RPC message type: " + lastHeader.type); } } finally { lastHeader = null; @@ -103,7 +102,7 @@ private OutstandingRpc findRpc(long id) { } } throw new IllegalArgumentException(String.format( - "Received RPC reply for unknown RPC (%d).", id)); + "[%s] Received RPC reply for unknown RPC (%d).", name(), id)); } private void handleCall(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -124,7 +123,7 @@ private void handleCall(ChannelHandlerContext ctx, Object msg) throws Exception } replyType = Rpc.MessageType.REPLY; } catch (InvocationTargetException ite) { - LOG.debug(String.format("[%s] Error in RPC handler.", name()), ite.getCause()); + LOG.error(String.format("[%s] Error in RPC handler.", name()), ite.getCause()); replyPayload = Throwables.getStackTraceAsString(ite.getCause()); replyType = Rpc.MessageType.ERROR; } @@ -140,12 +139,12 @@ private void handleReply(ChannelHandlerContext ctx, Object msg, OutstandingRpc r private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc) throws Exception { if (msg instanceof String) { - LOG.warn("Received error message:{}.", msg); + LOG.error("[{}] Received error message: {}.", name(), msg); rpc.future.setFailure(new RpcException((String) msg)); } else { String error = String.format("Received error with unexpected payload (%s).", msg != null ? msg.getClass().getName() : null); - LOG.warn(String.format("[%s] %s", name(), error)); + LOG.error(String.format("[%s] %s", name(), error)); rpc.future.setFailure(new IllegalArgumentException(error)); ctx.close(); } @@ -178,7 +177,7 @@ public final void channelInactive(ChannelHandlerContext ctx) throws Exception { } void registerRpc(long id, Promise promise, String type) { - LOG.debug("[{}] Registered outstanding rpc {} ({}).", name(), id, type); + LOG.trace("[{}] Registered outstanding rpc {} ({}).", name(), id, type); rpcs.add(new OutstandingRpc(id, promise)); } @@ -196,5 +195,4 @@ void discardRpc(long id) { this.future = future; } } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 6c6ab74ce7..f1383d6f18 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.SecureRandom; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Random; @@ -84,7 +85,7 @@ public RpcServer(Map mapConf) throws IOException, InterruptedExc this.group = new NioEventLoopGroup( this.config.getRpcThreadCount(), new ThreadFactoryBuilder() - .setNameFormat("RPC-Handler-%d") + .setNameFormat("Spark-Driver-RPC-Handler-%d") .setDaemon(true) .build()); ServerBootstrap serverBootstrap = new ServerBootstrap() @@ -100,7 +101,7 @@ public void initChannel(SocketChannel ch) throws Exception { Runnable cancelTask = new Runnable() { @Override public void run() { - LOG.warn("Timed out waiting for hello from client."); + LOG.warn("Timed out waiting for test message from Remote Spark driver."); newRpc.close(); } }; @@ -117,6 +118,8 @@ public void run() { this.port = ((InetSocketAddress) channel.localAddress()).getPort(); this.pendingClients = Maps.newConcurrentMap(); this.address = this.config.getServerAddress(); + + LOG.info("Successfully created Remote Spark Driver RPC Server with address {}:{}", this.address, this.port); } /** @@ -143,7 +146,8 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap) // Retry the next port } } - throw new IOException("No available ports from configured RPC Server ports for HiveServer2"); + throw new IOException("Remote Spark Driver RPC Server cannot bind to any of the configured ports: " + + Arrays.toString(config.getServerPorts().toArray())); } } @@ -169,7 +173,9 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap) Runnable timeout = new Runnable() { @Override public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for client connection.")); + promise.setFailure(new TimeoutException( + String.format("Client '%s' timed out waiting for connection from the Remote Spark" + + " Driver", clientId))); } }; ScheduledFuture timeoutFuture = group.schedule(timeout, @@ -179,7 +185,7 @@ public void run() { timeoutFuture); if (pendingClients.putIfAbsent(clientId, client) != null) { throw new IllegalStateException( - String.format("Client '%s' already registered.", clientId)); + String.format("Remote Spark Driver with client ID '%s' already registered", clientId)); } promise.addListener(new GenericFutureListener>() { @@ -208,7 +214,7 @@ public void cancelClient(final String clientId, final String msg) { cinfo.timeoutFuture.cancel(true); if (!cinfo.promise.isDone()) { cinfo.promise.setFailure(new RuntimeException( - String.format("Cancel client '%s'. Error: " + msg, clientId))); + String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); } }