diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e486486..c873e8a 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -17,6 +17,7 @@ package org.apache.hive.spark.client; +import com.google.common.base.Throwables; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.nio.NioEventLoopGroup; @@ -224,20 +225,24 @@ private String getArg(String[] args, int keyIdx) { private class DriverProtocol extends BaseProtocol { void sendError(Throwable error) { + LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error)); clientRpc.call(new Error(error)); } void jobFinished(String jobId, T result, Throwable error, SparkCounters counters) { + LOG.debug("Send job({}) result to Client.", jobId); clientRpc.call(new JobResult(jobId, result, error, counters)); } void jobSubmitted(String jobId, int sparkJobId) { + LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId); clientRpc.call(new JobSubmitted(jobId, sparkJobId)); } - void sendMetrics(String clientId, int jobId, int stageId, long taskId, Metrics metrics) { - clientRpc.call(new JobMetrics(clientId, jobId, stageId, taskId, metrics)); + void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) { + LOG.debug("Send task({}/{}/{}/{}) metric to Client.", jobId, sparkJobId, stageId, taskId); + clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); } private void handle(ChannelHandlerContext ctx, CancelJob msg) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index aa8e60e..54eacfd 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -346,6 +346,7 @@ private void redirect(String name, InputStream in) { jobs.put(jobId, handle); final io.netty.util.concurrent.Future rpc = driverRpc.call(new JobRequest(jobId, job)); + LOG.debug("Send JobRequest[{}].", jobId); // Link the RPC and the promise so that events from one are propagated to the other as // needed. diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java index 3db4b52..ab8f204 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java @@ -139,6 +139,7 @@ private void handleReply(ChannelHandlerContext ctx, Object msg, OutstandingRpc r private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc) throws Exception { if (msg instanceof String) { + LOG.warn("Received error message:{}.", msg); rpc.future.setFailure(new RpcException((String) msg)); } else { String error = String.format("Received error with unexpected payload (%s).",