diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4ed1636..1ec5918 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2653,6 +2653,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // For Arrow SerDe HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", Long.MAX_VALUE, "Arrow root allocator memory size limitation in bytes."), + HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit", 10_000_000_000L, + "Max bytes per arrow batch. This is a threshold, the memory is not pre-allocated."), HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows sent in one Arrow batch."), // For Druid storage handler diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java index 1b3a3eb..9ee1048 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java @@ -20,11 +20,12 @@ import java.io.IOException; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.hadoop.io.Writable; -import java.nio.channels.WritableByteChannel; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.slf4j.Logger; @@ -47,15 +48,28 @@ public static final Logger LOG = LoggerFactory.getLogger(LlapArrowRecordWriter.class); ArrowStreamWriter arrowStreamWriter; - WritableByteChannel out; + WritableByteChannelAdapter out; + BufferAllocator allocator; + NullableMapVector rootVector; - public LlapArrowRecordWriter(WritableByteChannel out) { + public LlapArrowRecordWriter(WritableByteChannelAdapter out) { this.out = out; } @Override public void close(Reporter reporter) throws IOException { - arrowStreamWriter.close(); + try { + arrowStreamWriter.close(); + } finally { + rootVector.close(); + //bytesLeaked should always be 0 + long bytesLeaked = allocator.getAllocatedMemory(); + if(bytesLeaked != 0) { + LOG.error("Arrow memory leaked bytes: {}", bytesLeaked); + throw new IllegalStateException("Arrow memory leaked bytes:" + bytesLeaked); + } + allocator.close(); + } } @Override @@ -64,6 +78,9 @@ public void write(K key, V value) throws IOException { if (arrowStreamWriter == null) { VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot(); arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out); + allocator = arrowWrapperWritable.getAllocator(); + this.out.setAllocator(allocator); + rootVector = arrowWrapperWritable.getRootVector(); } arrowStreamWriter.writeBatch(); } diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 996f8b3..c71c637 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -199,12 +199,10 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy int maxPendingWrites = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); - long allocatorMax = HiveConf.getLongVar(conf, - HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT); @SuppressWarnings("rawtypes") RecordWriter writer = null; if(useArrow) { - writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id, allocatorMax)); + writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id)); } else { writer = new LlapRecordWriter(id, new ChunkedOutputStream( diff --git ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java index 753da22..b07ce5b 100644 --- ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java +++ ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.llap; -import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -26,6 +25,7 @@ import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.concurrent.Semaphore; +import org.apache.arrow.memory.BufferAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +49,7 @@ private final Semaphore writeResources; private boolean closed = false; private final String id; - private long allocatorMax; + private BufferAllocator allocator; private ChannelFutureListener writeListener = new ChannelFutureListener() { @Override @@ -77,12 +77,15 @@ public void operationComplete(ChannelFuture future) { } }; - public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id, long allocatorMax) { + public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) { this.chc = chc; this.maxPendingWrites = maxPendingWrites; this.writeResources = new Semaphore(maxPendingWrites); this.id = id; - this.allocatorMax = allocatorMax; + } + + public void setAllocator(BufferAllocator allocator) { + this.allocator = allocator; } @Override @@ -90,7 +93,7 @@ public int write(ByteBuffer src) throws IOException { int size = src.remaining(); //Down the semaphore or block until available takeWriteResources(1); - ByteBuf buf = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(allocatorMax).buffer(size); + ByteBuf buf = allocator.buffer(size); buf.writeBytes(src); chc.writeAndFlush(buf).addListener(writeListener); return size; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java index dd490b1..40813fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.io.arrow; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.hadoop.io.WritableComparable; +import org.apache.arrow.vector.complex.NullableMapVector; import java.io.DataInput; import java.io.DataOutput; @@ -27,10 +29,19 @@ public class ArrowWrapperWritable implements WritableComparable { private VectorSchemaRoot vectorSchemaRoot; + private BufferAllocator allocator; + private NullableMapVector rootVector; public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) { this.vectorSchemaRoot = vectorSchemaRoot; } + + public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot, BufferAllocator allocator, NullableMapVector rootVector) { + this.vectorSchemaRoot = vectorSchemaRoot; + this.allocator = allocator; + this.rootVector = rootVector; + } + public ArrowWrapperWritable() {} public VectorSchemaRoot getVectorSchemaRoot() { @@ -41,6 +52,14 @@ public void setVectorSchemaRoot(VectorSchemaRoot vectorSchemaRoot) { this.vectorSchemaRoot = vectorSchemaRoot; } + public BufferAllocator getAllocator() { + return allocator; + } + + public NullableMapVector getRootVector() { + return rootVector; + } + @Override public void write(DataOutput dataOutput) throws IOException { throw new UnsupportedOperationException(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 2961050..65a889e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -69,11 +69,13 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.arrow.memory.BufferAllocator; import java.util.ArrayList; import java.util.List; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; @@ -96,20 +98,29 @@ private final VectorizedRowBatch vectorizedRowBatch; private final VectorAssignRow vectorAssignRow; private int batchSize; + private BufferAllocator allocator; private final NullableMapVector rootVector; Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException { MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE); + long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS); + String childAllocatorName = Thread.currentThread().getName(); + //Use per-task allocator for accounting only, no need to reserve per-task memory + long childAllocatorReservation = 0L; + //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed + allocator = serDe.rootAllocator.newChildAllocator( + childAllocatorName, + childAllocatorReservation, + childAllocatorLimit); // Schema structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector); List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); fieldSize = fieldTypeInfos.size(); - // Init Arrow stuffs - rootVector = NullableMapVector.empty(null, serDe.rootAllocator); + rootVector = NullableMapVector.empty(null, allocator); // Init Hive stuffs vectorizedRowBatch = new VectorizedRowBatch(fieldSize); @@ -146,7 +157,7 @@ private ArrowWrapperWritable serializeBatch() { batchSize = 0; VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector); - return new ArrowWrapperWritable(vectorSchemaRoot); + return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector); } private FieldType toFieldType(TypeInfo typeInfo) {