diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 541af57..5b07204 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2847,6 +2847,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."), LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003, "LLAP daemon output service port"), + LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size", + 128 * 1024, "Send buffer size to be used by LLAP daemon output service"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index f852041..06660b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -109,15 +109,18 @@ public void start() throws IOException { LOG.info("Starting LlapOutputFormatService"); int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); + int sendBufferSize = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE); eventLoopGroup = new NioEventLoopGroup(1); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventLoopGroup); serverBootstrap.channel(NioServerSocketChannel.class); - serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler()); + serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler(sendBufferSize)); try { listeningChannelFuture = serverBootstrap.bind(portFromConf).sync(); this.port = ((InetSocketAddress) listeningChannelFuture.channel().localAddress()).getPort(); - LOG.info("LlapOutputFormatService: Binding to port " + this.port); + LOG.info("LlapOutputFormatService: Binding to port: {} with send buffer size: {} ", this.port, + sendBufferSize); } catch (InterruptedException err) { throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err); } @@ -154,6 +157,11 @@ public int getPort() { } protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler { + private final int sendBufferSize; + public LlapOutputFormatServiceHandler(final int sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + @Override public void channelRead0(ChannelHandlerContext ctx, String msg) { String id = msg; @@ -162,9 +170,8 @@ public void channelRead0(ChannelHandlerContext ctx, String msg) { private void registerReader(ChannelHandlerContext ctx, String id) { synchronized(INSTANCE) { - LOG.debug("registering socket for: "+id); - int bufSize = 128 * 1024; // configable? - OutputStream stream = new ChannelOutputStream(ctx, id, bufSize); + LOG.debug("registering socket for: " + id); + OutputStream stream = new ChannelOutputStream(ctx, id, sendBufferSize); LlapRecordWriter writer = new LlapRecordWriter(stream); writers.put(id, writer); @@ -198,13 +205,18 @@ public void operationComplete(ChannelFuture future) throws Exception { } protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer { + private final int sendBufferSize; + public LlapOutputFormatServiceChannelHandler(final int sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()), new StringDecoder(), new StringEncoder(), - new LlapOutputFormatServiceHandler()); + new LlapOutputFormatServiceHandler(sendBufferSize)); } } }