diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java index d9c5666..014e49d 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java @@ -39,11 +39,19 @@ private BufferAllocator allocator; private ArrowStreamReader arrowStreamReader; + //Allows client to provide and manage their own arrow BufferAllocator public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class clazz, - JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + JobConf job, Closeable client, Socket socket, BufferAllocator allocator) throws IOException { super(in, schema, clazz, job, client, socket); - allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit); + this.allocator = allocator; this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); + } + + //Use the global arrow BufferAllocator + public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class clazz, + JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + this(in, schema, clazz, job, client, socket, + RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit)); } @Override @@ -76,6 +84,9 @@ public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOExcep @Override public void close() throws IOException { arrowStreamReader.close(); + //allocator.close() will throw exception unless all buffers have been released + //See org.apache.arrow.memory.BaseAllocator.close() + allocator.close(); } } diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java index fafbdee..7690599 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java @@ -25,16 +25,28 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; +import java.util.UUID; /* * Adapts an Arrow batch reader to a row reader + * Only used for testing */ public class LlapArrowRowInputFormat implements InputFormat { private LlapBaseInputFormat baseInputFormat; public LlapArrowRowInputFormat(long arrowAllocatorLimit) { - baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit); + BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator( + //allocator name, use UUID for testing + UUID.randomUUID().toString(), + //No use for reservation, allocators claim memory from the same pool, + //but allocate/releases are tracked per-allocator + 0, + //Limit passed in by client + arrowAllocatorLimit); + baseInputFormat = new LlapBaseInputFormat(true, allocator); } @Override diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index ef03be6..5677eb7 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -65,6 +65,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.arrow.memory.BufferAllocator; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; @@ -106,6 +107,7 @@ private String query; private boolean useArrow; private long arrowAllocatorLimit; + private BufferAllocator allocator; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -125,11 +127,17 @@ public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.query = query; } + //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead) public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) { this.useArrow = useArrow; this.arrowAllocatorLimit = arrowAllocatorLimit; } + public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) { + this.useArrow = useArrow; + this.allocator = allocator; + } + public LlapBaseInputFormat() { this.useArrow = false; } @@ -206,10 +214,19 @@ public LlapBaseInputFormat() { @SuppressWarnings("rawtypes") LlapBaseRecordReader recordReader; if(useArrow) { - recordReader = new LlapArrowBatchRecordReader( - socket.getInputStream(), llapSplit.getSchema(), - ArrowWrapperWritable.class, job, llapClient, socket, - arrowAllocatorLimit); + if(allocator != null) { + //Client provided their own allocator + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + allocator); + } else { + //Client did not provide their own allocator, use constructor for global allocator + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + arrowAllocatorLimit); + } } else { recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);