diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index c71c637..70ffd17 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -199,10 +199,12 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy int maxPendingWrites = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); + long allocatorMax = HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT); @SuppressWarnings("rawtypes") RecordWriter writer = null; if(useArrow) { - writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id)); + writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id, allocatorMax)); } else { writer = new LlapRecordWriter(id, new ChunkedOutputStream( diff --git ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java index 57da1d9..753da22 100644 --- ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java +++ ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java @@ -18,7 +18,8 @@ package org.apache.hadoop.hive.llap; -import io.netty.buffer.Unpooled; +import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import java.io.IOException; @@ -48,6 +49,7 @@ private final Semaphore writeResources; private boolean closed = false; private final String id; + private long allocatorMax; private ChannelFutureListener writeListener = new ChannelFutureListener() { @Override @@ -75,11 +77,12 @@ public void operationComplete(ChannelFuture future) { } }; - public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) { + public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id, long allocatorMax) { this.chc = chc; this.maxPendingWrites = maxPendingWrites; this.writeResources = new Semaphore(maxPendingWrites); this.id = id; + this.allocatorMax = allocatorMax; } @Override @@ -87,7 +90,9 @@ public int write(ByteBuffer src) throws IOException { int size = src.remaining(); //Down the semaphore or block until available takeWriteResources(1); - chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener); + ByteBuf buf = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(allocatorMax).buffer(size); + buf.writeBytes(src); + chc.writeAndFlush(buf).addListener(writeListener); return size; }