commit af202ad8990a5f3c2484de73087dfad3c63bec2e Author: Sahil Takiar Date: Thu Oct 19 08:14:18 2017 -0700 HIVE-17838: Make org.apache.hive.spark.client.rpc logging HoS specific and other logging cleanup diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 093032525a..a30ab17161 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import java.io.Serializable; +import java.util.List; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.rpc.RpcDispatcher; @@ -27,7 +28,7 @@ abstract class BaseProtocol extends RpcDispatcher { - protected static class CancelJob implements Serializable { + protected static class CancelJob implements Serializable, Message { final String id; @@ -39,13 +40,21 @@ this(null); } + @Override + public String getDescription() { + return "Cancel Job"; + } } - protected static class EndSession implements Serializable { + protected static class EndSession implements Serializable, Message { + @Override + public String getDescription() { + return "End Session"; + } } - protected static class Error implements Serializable { + protected static class Error implements Serializable, Message { final String cause; @@ -61,9 +70,13 @@ this(null); } + @Override + public String getDescription() { + return "Error Message"; + } } - protected static class JobMetrics implements Serializable { + protected static class JobMetrics implements Serializable, Message { final String jobId; final int sparkJobId; @@ -83,9 +96,13 @@ this(null, -1, -1, -1, null); } + @Override + public String getDescription() { + return "Job Metrics"; + } } - protected static class JobRequest implements Serializable { + protected static class JobRequest implements Serializable, Message { final String id; final Job job; @@ -99,9 +116,13 @@ this(null, null); } + @Override + public String getDescription() { + return "Job Request"; + } } - protected static class JobResult implements Serializable { + protected static class JobResult implements Serializable, Message { final String id; final T result; @@ -119,9 +140,13 @@ this(null, null, null, null); } + @Override + public String getDescription() { + return "Job Result"; + } } - protected static class JobStarted implements Serializable { + protected static class JobStarted implements Serializable, Message { final String id; @@ -133,12 +158,16 @@ this(null); } + @Override + public String getDescription() { + return "Job Started"; + } } /** * Inform the client that a new spark job has been submitted for the client job. */ - protected static class JobSubmitted implements Serializable { + protected static class JobSubmitted implements Serializable, Message { final String clientJobId; final int sparkJobId; @@ -150,9 +179,14 @@ JobSubmitted() { this(null, -1); } + + @Override + public String getDescription() { + return "Job Submitted"; + } } - protected static class SyncJobRequest implements Serializable { + protected static class SyncJobRequest implements Serializable, Message { final Job job; @@ -164,6 +198,9 @@ this(null); } + @Override + public String getDescription() { + return "Sync Job"; + } } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/Message.java b/spark-client/src/main/java/org/apache/hive/spark/client/Message.java new file mode 100644 index 0000000000..742fc747b4 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/Message.java @@ -0,0 +1,6 @@ +package org.apache.hive.spark.client; + +public interface Message { + + String getDescription(); +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index ede8ce9e40..dd46d0db23 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -111,19 +112,19 @@ private RemoteDriver(String[] args) throws Exception { String[] val = getArg(args, idx).split("[=]", 2); conf.set(val[0], val[1]); } else { - throw new IllegalArgumentException("Invalid command line: " + throw new IllegalArgumentException("Invalid command line arguments: " + Joiner.on(" ").join(args)); } } executor = Executors.newCachedThreadPool(); - LOG.info("Connecting to: {}:{}", serverAddress, serverPort); + LOG.info("Connecting to HiveServer2: {}:{}", serverAddress, serverPort); Map mapConf = Maps.newHashMap(); for (Tuple2 e : conf.getAll()) { mapConf.put(e._1(), e._2()); - LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2()); + LOG.debug("Remote Spark Driver configured with: " + e._1() + "=" + e._2()); } String clientId = mapConf.get(SparkClientFactory.CONF_CLIENT_ID); @@ -135,7 +136,7 @@ private RemoteDriver(String[] args) throws Exception { this.egroup = new NioEventLoopGroup( threadCount, new ThreadFactoryBuilder() - .setNameFormat("Driver-RPC-Handler-%d") + .setNameFormat("Remote-Spark-Driver-RPC-Handler-%d") .setDaemon(true) .build()); this.protocol = new DriverProtocol(); @@ -148,9 +149,14 @@ private RemoteDriver(String[] args) throws Exception { this.clientRpc.addListener(new Rpc.Listener() { @Override public void rpcClosed(Rpc rpc) { - LOG.warn("Shutting down driver because RPC channel was closed."); + LOG.warn("Shutting down driver because Remote Spark Driver to HiveServer2 connection was closed."); shutdown(null); } + + @Override + public String getDescription() { + return "Shutting Down Remote Spark Driver to HiveServer2 Connection"; + } }); try { @@ -195,7 +201,7 @@ private void submit(JobWrapper job) { if (jc != null) { job.submit(); } else { - LOG.info("SparkContext not yet up, queueing job request."); + LOG.info("SparkContext not yet up; adding Hive on Spark job request to the queue."); jobQueue.add(job); } } @@ -204,9 +210,9 @@ private void submit(JobWrapper job) { private synchronized void shutdown(Throwable error) { if (running) { if (error == null) { - LOG.info("Shutting down remote driver."); + LOG.info("Shutting down Spark Remote Driver."); } else { - LOG.error("Shutting down remote driver due to error: " + error, error); + LOG.error("Shutting down Spark Remote Driver due to error: " + error, error); } running = false; for (JobWrapper job : activeJobs.values()) { @@ -237,7 +243,7 @@ private boolean cancelJob(JobWrapper job) { private String getArg(String[] args, int keyIdx) { int valIdx = keyIdx + 1; if (args.length <= valIdx) { - throw new IllegalArgumentException("Invalid command line: " + throw new IllegalArgumentException("Invalid command line arguments: " + Joiner.on(" ").join(args)); } return args[valIdx]; @@ -273,7 +279,7 @@ void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics private void handle(ChannelHandlerContext ctx, CancelJob msg) { JobWrapper job = activeJobs.get(msg.id); if (job == null || !cancelJob(job)) { - LOG.info("Requested to cancel an already finished job."); + LOG.info("Requested to cancel an already finished client job."); } } @@ -283,7 +289,7 @@ private void handle(ChannelHandlerContext ctx, EndSession msg) { } private void handle(ChannelHandlerContext ctx, JobRequest msg) { - LOG.info("Received job request {}", msg.id); + LOG.info("Received client job request {}", msg.id); JobWrapper wrapper = new JobWrapper(msg); activeJobs.put(msg.id, wrapper); submit(wrapper); @@ -297,7 +303,7 @@ private Object handle(ChannelHandlerContext ctx, SyncJobRequest msg) throws Exce while (jc == null) { jcLock.wait(); if (!running) { - throw new IllegalStateException("Remote context is shutting down."); + throw new IllegalStateException("Remote Spark context is shutting down."); } } } @@ -318,6 +324,10 @@ public void call(JavaFutureAction future, } } + @Override + public String name() { + return "Remote Spark Driver to HiveServer2 Connection"; + } } private class JobWrapper implements Callable { @@ -383,12 +393,13 @@ public void call(JavaFutureAction future, if (sparkCounters != null) { counters = sparkCounters.snapshot(); } + protocol.jobFinished(req.id, result, null, counters); } catch (Throwable t) { // Catch throwables in a best-effort to report job status back to the client. It's // re-thrown so that the executor can destroy the affected thread (or the JVM can // die or whatever would happen if the throwable bubbled up). - LOG.info("Failed to run job " + req.id, t); + LOG.info("Failed to run client job " + req.id, t); protocol.jobFinished(req.id, null, t, sparkCounters != null ? sparkCounters.snapshot() : null); throw new ExecutionException(t); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 8cedd30e1b..26b2b15bbb 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -88,5 +88,4 @@ public static SparkClient createClient(Map sparkConf, HiveConf h Preconditions.checkState(server != null, "initialize() not called."); return new SparkClientImpl(server, sparkConf, hiveConf); } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index e0ec3b79fc..1ced25cd93 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -42,6 +42,7 @@ import java.net.URI; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -103,20 +104,19 @@ // The RPC server will take care of timeouts here. this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get(); } catch (Throwable e) { - String errorMsg = null; + String errorMsg; if (e.getCause() instanceof TimeoutException) { - errorMsg = "Timed out waiting for client to connect.\nPossible reasons include network " + - "issues, errors in remote driver or the cluster has no available resources, etc." + + errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " + + "include network issues, errors in remote driver or the cluster has no available resources, etc." + "\nPlease check YARN or Spark driver's logs for further information."; } else if (e.getCause() instanceof InterruptedException) { - errorMsg = "Interruption occurred while waiting for client to connect.\nPossibly the Spark session is closed " + - "such as in case of query cancellation." + - "\nPlease refer to HiveServer2 logs for further information."; + errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " + + "that query was cancelled which would cause the Spark Session to close."; } else { - errorMsg = "Error while waiting for client to connect."; + errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; } LOG.error(errorMsg, e); - driverThread.interrupt(); + driverThread.interrupt(); // TODO cleanup try { driverThread.join(); } catch (InterruptedException ie) { @@ -126,14 +126,21 @@ throw Throwables.propagate(e); } + LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); + driverRpc.addListener(new Rpc.Listener() { @Override public void rpcClosed(Rpc rpc) { if (isAlive) { - LOG.warn("Client RPC channel closed unexpectedly."); + LOG.warn("Connection to Remote Spark Driver closed unexpectedly."); isAlive = false; } } + + @Override + public String getDescription() { + return "Connection to Remote Spark Driver Closed Unexpectedly"; + } }); isAlive = true; } @@ -285,7 +292,7 @@ public void run() { try { URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); if (sparkDefaultsUrl != null) { - LOG.info("Loading spark defaults: " + sparkDefaultsUrl); + LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl); allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); } } catch (Exception e) { @@ -597,7 +604,7 @@ void cancel(String jobId) { } private void handle(ChannelHandlerContext ctx, Error msg) { - LOG.warn("Error reported from remote driver.", msg.cause); + LOG.warn("Error reported from Remote Spark Driver.", msg.cause); } private void handle(ChannelHandlerContext ctx, JobMetrics msg) { @@ -605,14 +612,14 @@ private void handle(ChannelHandlerContext ctx, JobMetrics msg) { if (handle != null) { handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics); } else { - LOG.warn("Received metrics for unknown job {}", msg.jobId); + LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId); } } private void handle(ChannelHandlerContext ctx, JobResult msg) { JobHandleImpl handle = jobs.remove(msg.id); if (handle != null) { - LOG.info("Received result for {}", msg.id); + LOG.debug("Received result for client job {}", msg.id); handle.setSparkCounters(msg.sparkCounters); Throwable error = msg.error != null ? new SparkException(msg.error) : null; if (error == null) { @@ -621,7 +628,7 @@ private void handle(ChannelHandlerContext ctx, JobResult msg) { handle.setFailure(error); } } else { - LOG.warn("Received result for unknown job {}", msg.id); + LOG.warn("Received result for unknown client job {}", msg.id); } } @@ -630,20 +637,24 @@ private void handle(ChannelHandlerContext ctx, JobStarted msg) { if (handle != null) { handle.changeState(JobHandle.State.STARTED); } else { - LOG.warn("Received event for unknown job {}", msg.id); + LOG.warn("Received event for unknown client job {}", msg.id); } } private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { JobHandleImpl handle = jobs.get(msg.clientJobId); if (handle != null) { - LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId); + LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId); handle.addSparkJobId(msg.sparkJobId); } else { - LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId); + LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId); } } + @Override + protected String name() { + return "HiveServer2 to Remote Spark Driver Connection"; + } } private static class AddJarJob implements Job { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java index cbbfb1ca64..f04ef63d6a 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java @@ -19,6 +19,8 @@ import java.io.Closeable; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -59,6 +61,7 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; +import org.apache.hive.spark.client.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +119,7 @@ final Runnable timeoutTask = new Runnable() { @Override public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection.")); + promise.setFailure(new TimeoutException("Timed out waiting to connect to HiveServer2.")); } }; final ScheduledFuture timeoutFuture = eloop.schedule(timeoutTask, @@ -254,7 +257,7 @@ public void addListener(Listener l) { * Send an RPC call to the remote endpoint and returns a future that can be used to monitor the * operation. */ - public Future call(Object msg) { + public Future call(Message msg) { return call(msg, Void.class); } @@ -270,9 +273,9 @@ public boolean isActive() { * @param retType Type of expected reply. * @return A future used to monitor the operation. */ - public Future call(final Object msg, Class retType) { + public Future call(final Message msg, Class retType) { Preconditions.checkArgument(msg != null); - Preconditions.checkState(channel.isActive(), "RPC channel is closed."); + Preconditions.checkState(channel.isActive(), "Remote Spark Driver - HiveServer2 connection is closed."); try { final long id = rpcId.getAndIncrement(); final Promise promise = createPromise(); @@ -280,7 +283,8 @@ public boolean isActive() { @Override public void operationComplete(ChannelFuture cf) { if (!cf.isSuccess() && !promise.isDone()) { - LOG.warn("Failed to send RPC, closing connection.", cf.cause()); + LOG.warn("Failed to send message '" + msg.getDescription() + + "', closing Remote Spark Driver - HiveServer2 connection.", cf.cause()); promise.setFailure(cf.cause()); dispatcher.discardRpc(id); close(); @@ -314,6 +318,14 @@ Channel getChannel() { return channel; } + /** + * Returns the "hostname:port" that the RPC is connected to + */ + public String getRemoteAddress() { + InetSocketAddress remoteAddress = ((InetSocketAddress) this.channel.remoteAddress()); + return remoteAddress.getHostName() + ":" + remoteAddress.getPort(); + } + void setDispatcher(RpcDispatcher dispatcher) { Preconditions.checkNotNull(dispatcher); Preconditions.checkState(this.dispatcher == null); @@ -336,7 +348,7 @@ public void close() { try { l.rpcClosed(this); } catch (Exception e) { - LOG.warn("Error caught in Rpc.Listener invocation.", e); + LOG.warn("Error caught in while running '" + l.getDescription() + "' listener", e); } } } @@ -347,6 +359,7 @@ public void close() { void rpcClosed(Rpc rpc); + String getDescription(); } static enum MessageType { @@ -493,12 +506,10 @@ void sendHello(Channel c) throws Exception { client.evaluateChallenge(new byte[0]) : new byte[0]; c.writeAndFlush(new SaslMessage(clientId, hello)).addListener(future -> { if (!future.isSuccess()) { - LOG.error("Failed to send hello to server", future.cause()); + LOG.error("Failed to send opening message to HiveServer2", future.cause()); onError(future.cause()); } }); } - } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java index 00f5a17412..e38e4b3f26 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java @@ -66,13 +66,12 @@ protected String name() { protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (lastHeader == null) { if (!(msg instanceof Rpc.MessageHeader)) { - LOG.warn("[{}] Expected RPC header, got {} instead.", name(), - msg != null ? msg.getClass().getName() : null); - throw new IllegalArgumentException(); + throw new IllegalArgumentException(String.format("[%s] Expected RPC header, got %s instead.", name(), + msg != null ? msg.getClass().getName() : null)); } lastHeader = (Rpc.MessageHeader) msg; } else { - LOG.debug("[{}] Received RPC message: type={} id={} payload={}", name(), + LOG.trace("[{}] Received RPC message: type={} id={} payload={}", name(), lastHeader.type, lastHeader.id, msg != null ? msg.getClass().getName() : null); try { switch (lastHeader.type) { @@ -86,7 +85,7 @@ protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws handleError(ctx, msg, findRpc(lastHeader.id)); break; default: - throw new IllegalArgumentException("Unknown RPC message type: " + lastHeader.type); + throw new IllegalArgumentException("[" + name() + "] Unknown RPC message type: " + lastHeader.type); } } finally { lastHeader = null; @@ -103,7 +102,7 @@ private OutstandingRpc findRpc(long id) { } } throw new IllegalArgumentException(String.format( - "Received RPC reply for unknown RPC (%d).", id)); + "[%s] Received RPC reply for unknown RPC (%d).", name(), id)); } private void handleCall(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -124,7 +123,7 @@ private void handleCall(ChannelHandlerContext ctx, Object msg) throws Exception } replyType = Rpc.MessageType.REPLY; } catch (InvocationTargetException ite) { - LOG.debug(String.format("[%s] Error in RPC handler.", name()), ite.getCause()); + LOG.error(String.format("[%s] Error in RPC handler.", name()), ite.getCause()); replyPayload = Throwables.getStackTraceAsString(ite.getCause()); replyType = Rpc.MessageType.ERROR; } @@ -140,7 +139,7 @@ private void handleReply(ChannelHandlerContext ctx, Object msg, OutstandingRpc r private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc) throws Exception { if (msg instanceof String) { - LOG.warn("Received error message:{}.", msg); + LOG.warn("[{}] Received error message:{}.", name(), msg); rpc.future.setFailure(new RpcException((String) msg)); } else { String error = String.format("Received error with unexpected payload (%s).", @@ -178,7 +177,7 @@ public final void channelInactive(ChannelHandlerContext ctx) throws Exception { } void registerRpc(long id, Promise promise, String type) { - LOG.debug("[{}] Registered outstanding rpc {} ({}).", name(), id, type); + LOG.trace("[{}] Registered outstanding rpc {} ({}).", name(), id, type); rpcs.add(new OutstandingRpc(id, promise)); } @@ -196,5 +195,4 @@ void discardRpc(long id) { this.future = future; } } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 6c6ab74ce7..83c2395ab0 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -84,7 +84,7 @@ public RpcServer(Map mapConf) throws IOException, InterruptedExc this.group = new NioEventLoopGroup( this.config.getRpcThreadCount(), new ThreadFactoryBuilder() - .setNameFormat("RPC-Handler-%d") + .setNameFormat("Remote-Spark-Driver-RPC-Handler-%d") .setDaemon(true) .build()); ServerBootstrap serverBootstrap = new ServerBootstrap() @@ -100,7 +100,7 @@ public void initChannel(SocketChannel ch) throws Exception { Runnable cancelTask = new Runnable() { @Override public void run() { - LOG.warn("Timed out waiting for hello from client."); + LOG.warn("Timed out waiting for opening message from Remote Spark driver."); newRpc.close(); } }; @@ -169,7 +169,7 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap) Runnable timeout = new Runnable() { @Override public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for client connection.")); + promise.setFailure(new TimeoutException("Timed out waiting for Remote Spark Driver connection.")); } }; ScheduledFuture timeoutFuture = group.schedule(timeout, diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java index 21e4595806..1c80f8f55f 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java @@ -47,6 +47,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.spark.client.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.After; @@ -221,6 +222,11 @@ public void testCloseListener() throws Exception { public void rpcClosed(Rpc rpc) { closeCount.incrementAndGet(); } + + @Override + public String getDescription() { + return "Test Close Count Listener"; + } }); client.close(); @@ -363,7 +369,7 @@ private void transfer(Rpc serverRpc, Rpc clientRpc) { return new Rpc[]{serverRpc, clientRpc}; } - private static class TestMessage { + private static class TestMessage implements Message { final String message; @@ -375,9 +381,13 @@ public TestMessage(String message) { this.message = message; } + @Override + public String getDescription() { + return "Test Message"; + } } - private static class ErrorCall { + private static class ErrorCall implements Message { final String error; @@ -389,14 +399,22 @@ public ErrorCall(String error) { this.error = error; } + @Override + public String getDescription() { + return "Error Call"; + } } - private static class NotDeserializable { + private static class NotDeserializable implements Message { NotDeserializable(int unused) { } + @Override + public String getDescription() { + return "Not Deserializable"; + } } private static class TestDispatcher extends RpcDispatcher {