diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 541af57..5b07204 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2847,6 +2847,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."), LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003, "LLAP daemon output service port"), + LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size", + 128 * 1024, "Send buffer size to be used by LLAP daemon output service"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index f852041..6d0de7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -85,6 +85,7 @@ private ServerBootstrap serverBootstrap; private ChannelFuture listeningChannelFuture; private int port; + private int sendBufferSize = 128 * 1024; private LlapOutputFormatService(Configuration conf) throws IOException { writers = new HashMap(); @@ -109,6 +110,8 @@ public void start() throws IOException { LOG.info("Starting LlapOutputFormatService"); int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); + sendBufferSize = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE); eventLoopGroup = new NioEventLoopGroup(1); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventLoopGroup); @@ -117,7 +120,8 @@ public void start() throws IOException { try { listeningChannelFuture = serverBootstrap.bind(portFromConf).sync(); this.port = ((InetSocketAddress) listeningChannelFuture.channel().localAddress()).getPort(); - LOG.info("LlapOutputFormatService: Binding to port " + this.port); + LOG.info("LlapOutputFormatService: Binding to port: {} with send buffer size: {} ", this.port, + sendBufferSize); } catch (InterruptedException err) { throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err); } @@ -162,9 +166,8 @@ public void channelRead0(ChannelHandlerContext ctx, String msg) { private void registerReader(ChannelHandlerContext ctx, String id) { synchronized(INSTANCE) { - LOG.debug("registering socket for: "+id); - int bufSize = 128 * 1024; // configable? - OutputStream stream = new ChannelOutputStream(ctx, id, bufSize); + LOG.debug("registering socket for: " + id); + OutputStream stream = new ChannelOutputStream(ctx, id, sendBufferSize); LlapRecordWriter writer = new LlapRecordWriter(stream); writers.put(id, writer);