diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 858c630..76d05f9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2653,6 +2653,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // For Arrow SerDe HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", Long.MAX_VALUE, "Arrow root allocator memory size limitation in bytes."), + HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit", 10_000_000_000L, + "Max bytes per arrow batch. This is a threshold, the memory is not pre-allocated."), HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows sent in one Arrow batch."), // For Druid storage handler diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java index 1b3a3eb..f93cfad 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java @@ -20,9 +20,11 @@ import java.io.IOException; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.hadoop.io.Writable; import java.nio.channels.WritableByteChannel; import org.apache.hadoop.mapred.RecordWriter; @@ -48,6 +50,8 @@ ArrowStreamWriter arrowStreamWriter; WritableByteChannel out; + BufferAllocator allocator; + NullableMapVector rootVector; public LlapArrowRecordWriter(WritableByteChannel out) { this.out = out; @@ -55,7 +59,18 @@ public LlapArrowRecordWriter(WritableByteChannel out) { @Override public void close(Reporter reporter) throws IOException { - arrowStreamWriter.close(); + try { + arrowStreamWriter.close(); + } finally { + rootVector.close(); + //bytesLeaked should always be 0 + long bytesLeaked = allocator.getAllocatedMemory(); + if(bytesLeaked != 0) { + LOG.error("Arrow memory leaked bytes: {}", bytesLeaked); + throw new IllegalStateException("Arrow memory leaked bytes:" + bytesLeaked); + } + allocator.close(); + } } @Override @@ -64,6 +79,8 @@ public void write(K key, V value) throws IOException { if (arrowStreamWriter == null) { VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot(); arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out); + allocator = arrowWrapperWritable.getAllocator(); + rootVector = arrowWrapperWritable.getRootVector(); } arrowStreamWriter.writeBatch(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java index dd490b1..40813fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.io.arrow; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.hadoop.io.WritableComparable; +import org.apache.arrow.vector.complex.NullableMapVector; import java.io.DataInput; import java.io.DataOutput; @@ -27,10 +29,19 @@ public class ArrowWrapperWritable implements WritableComparable { private VectorSchemaRoot vectorSchemaRoot; + private BufferAllocator allocator; + private NullableMapVector rootVector; public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) { this.vectorSchemaRoot = vectorSchemaRoot; } + + public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot, BufferAllocator allocator, NullableMapVector rootVector) { + this.vectorSchemaRoot = vectorSchemaRoot; + this.allocator = allocator; + this.rootVector = rootVector; + } + public ArrowWrapperWritable() {} public VectorSchemaRoot getVectorSchemaRoot() { @@ -41,6 +52,14 @@ public void setVectorSchemaRoot(VectorSchemaRoot vectorSchemaRoot) { this.vectorSchemaRoot = vectorSchemaRoot; } + public BufferAllocator getAllocator() { + return allocator; + } + + public NullableMapVector getRootVector() { + return rootVector; + } + @Override public void write(DataOutput dataOutput) throws IOException { throw new UnsupportedOperationException(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 2961050..e8a16ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -69,11 +69,14 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.memory.BufferAllocator; import java.util.ArrayList; import java.util.List; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; @@ -96,18 +99,27 @@ private final VectorizedRowBatch vectorizedRowBatch; private final VectorAssignRow vectorAssignRow; private int batchSize; + private BufferAllocator allocator; private final NullableMapVector rootVector; Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException { MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE); + long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS); + String childAllocatorName = Thread.currentThread().getName(); + //Use per-task allocator for accounting only, no need to reserve per-task memory + long childAllocatorReservation = 0L; + //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed + allocator = serDe.rootAllocator.newChildAllocator( + childAllocatorName, + childAllocatorReservation, + childAllocatorLimit); // Schema structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector); List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); fieldSize = fieldTypeInfos.size(); - // Init Arrow stuffs rootVector = NullableMapVector.empty(null, allocator); @@ -146,7 +158,7 @@ private ArrowWrapperWritable serializeBatch() { batchSize = 0; VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector); - return new ArrowWrapperWritable(vectorSchemaRoot); + return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector); } private FieldType toFieldType(TypeInfo typeInfo) {