diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java index ae75785..a4bfd69 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java @@ -105,18 +105,18 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) kryoOut.flush(); byte[] msgData = bytes.toByteArray(); + LOG.debug("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length); checkSize(msgData.length); buf.ensureWritable(msgData.length + 4); buf.writeInt(msgData.length); buf.writeBytes(msgData); - LOG.debug("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length); } private void checkSize(int msgSize) { - Preconditions.checkArgument(msgSize > 0, "Message size must be positive."); + Preconditions.checkArgument(msgSize > 0, "Message size (%s bytes) must be positive.", msgSize); Preconditions.checkArgument(maxMessageSize <= 0 || msgSize <= maxMessageSize, - "Message exceeds maximum allowed size (%s bytes).", maxMessageSize); + "Message (%s bytes) exceeds maximum allowed size (%s bytes).", maxMessageSize, msgSize); } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java index ebaf616..5149902 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java @@ -268,8 +268,10 @@ public void addListener(Listener l) { @Override public void operationComplete(ChannelFuture cf) { if (!cf.isSuccess() && !promise.isDone()) { + LOG.warn("Failed to send RPC, closing connection.", cf.cause()); promise.setFailure(cf.cause()); dispatcher.get().discardRpc(id); + close(); } } }; diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index fa6d73e..584ee48 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -65,7 +65,7 @@ /** Maximum message size. Default = 10MB. */ public final static String RPC_MAX_MESSAGE_SIZE_KEY = "hive.spark.client.rpc.max.size"; - public final static int RPC_MAX_MESSAGE_SIZE_DEFAULT = 10 * 1024 * 1024; + public final static int RPC_MAX_MESSAGE_SIZE_DEFAULT = 50 * 1024 * 1024; /** Channel logging level. */ public final static String RPC_CHANNEL_LOG_LEVEL_KEY = "hive.spark.client.channel.log.level";