diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowInput.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowInput.java new file mode 100755 index 0000000..4932644 --- /dev/null +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowInput.java @@ -0,0 +1,68 @@ +package org.apache.hadoop.hive.llap; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.util.List; + +public class LlapArrowInput extends LlapBaseRecordReader { + + private static final Logger LOG = LoggerFactory.getLogger(LlapArrowInput.class); + private static final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + private final ArrowStreamReader arrowStreamReader; + + public LlapArrowInput(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client, Socket socket) throws IOException { + super(in, schema, clazz, job, client, socket); + this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); + } + + public List getCurrentFieldVectors() throws IOException { + return arrowStreamReader.getVectorSchemaRoot().getFieldVectors(); + } + + public boolean loadNextBatch() throws IOException { + return arrowStreamReader.loadNextBatch(); + } + + @Override + public void close() throws IOException { + arrowStreamReader.close(); + } + + @Override + public void handleEvent(LlapBaseRecordReader.ReaderEvent event) { + switch (event.getEventType()) { + case DONE: + readerEvents.add(event); + break; + case ERROR: + readerEvents.add(event); + if (readerThread == null) { + throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); + } + readerThread.interrupt(); + try { + close(); + } catch (IOException e) { + LOG.error("Cannot close the socket on error", e); + } + break; + default: + throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); + } + } +} diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index f4c7fa4..468ec94 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -104,6 +104,7 @@ private String user; // "hive", private String pwd; // "" private String query; + private boolean useArrow; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -123,7 +124,9 @@ public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.query = query; } - public LlapBaseInputFormat() {} + public LlapBaseInputFormat(boolean useArrow) { + this.useArrow = useArrow; + } @SuppressWarnings("unchecked") @@ -195,8 +198,13 @@ public LlapBaseInputFormat() {} LOG.info("Registered id: " + fragmentId); @SuppressWarnings("rawtypes") - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), - llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket); + LlapBaseRecordReader recordReader; + if(useArrow) { + recordReader = new LlapArrowInput(socket.getInputStream(), + llapSplit.getSchema(), NullWritable.class, job, llapClient, socket); + } else { + recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket); + } umbilicalResponder.setRecordReader(recordReader); return recordReader; } diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java index aee5502..910cb53 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java @@ -38,7 +38,7 @@ public class LlapRowInputFormat implements InputFormat { - private LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(); + private LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(false); @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {