diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b12a7a4d40..c14caf658f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4166,7 +4166,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_RFA, Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), - LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false, + LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true, "Whether LLapOutputFormatService should output arrow batches"), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 7e35fefedb..ed0f01243c 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -95,8 +95,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.hadoop.mapred.InputFormat; -public class TestJdbcWithMiniLlap { +/* + * Specialize this base class for different serde's/formats + * {@link #beforeTest(boolean) beforeTest} should be called + * by sub-classes in a {@link org.junit.BeforeClass} initializer + */ +public abstract class TestJdbcWithMiniLlap { private static MiniHS2 miniHS2 = null; private static String dataFileDir; private static Path kvDataFilePath; @@ -105,8 +111,8 @@ private static HiveConf conf = null; private Connection hs2Conn = null; - @BeforeClass - public static void beforeTest() throws Exception { + // This method should be called by sub-classes in a @BeforeClass initializer + public static void beforeTest(boolean useArrow) throws Exception { Class.forName(MiniHS2.getJdbcDriverName()); String confDir = "../../data/conf/llap/"; @@ -118,6 +124,11 @@ public static void beforeTest() throws Exception { conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + if(useArrow) { + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + } else { + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false); + } conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); @@ -184,7 +195,7 @@ public static void createTestTable(Connection connection, String database, Strin stmt.close(); } - private void createDataTypesTable(String tableName) throws Exception { + protected void createDataTypesTable(String tableName) throws Exception { Statement stmt = hs2Conn.createStatement(); // create table @@ -456,7 +467,7 @@ public void testComplexQuery() throws Exception { void process(Row row); } - private static class RowCollector implements RowProcessor { + protected static class RowCollector implements RowProcessor { ArrayList rows = new ArrayList(); Schema schema = null; int numColumns = 0; @@ -477,7 +488,7 @@ public void process(Row row) { } // Save the actual values from each row as opposed to the String representation. - private static class RowCollector2 implements RowProcessor { + protected static class RowCollector2 implements RowProcessor { ArrayList rows = new ArrayList(); Schema schema = null; int numColumns = 0; @@ -496,17 +507,19 @@ public void process(Row row) { } } - private int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception { + protected int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception { return processQuery(null, query, numSplits, rowProcessor); } + protected abstract InputFormat getInputFormat(); + private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception { String url = miniHS2.getJdbcURL(); String user = System.getProperty("user.name"); String pwd = user; String handleId = UUID.randomUUID().toString(); - LlapRowInputFormat inputFormat = new LlapRowInputFormat(); + InputFormat inputFormat = getInputFormat(); // Get splits JobConf job = new JobConf(conf); diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java new file mode 100644 index 0000000000..87706f8c22 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.List; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.io.NullWritable; +import org.junit.BeforeClass; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; + +/* + * TestJdbcWithMiniLlap for Arrow format + */ +public class TestJdbcWithMiniLlapArrow extends TestJdbcWithMiniLlap { + + @BeforeClass + public static void beforeTest() throws Exception { + TestJdbcWithMiniLlap.beforeTest(true); + } + + @Override + protected InputFormat getInputFormat() { + //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE + return new LlapArrowRowInputFormat(Long.MAX_VALUE); + } + + // TODO: Currently MAP type is not supported. Add it back when Arrow 1.0 is released. + // See: SPARK-21187 + @Override + public void testDataTypes() throws Exception { + createDataTypesTable("datatypes"); + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from datatypes"; + int rowCount = processQuery(query, 1, rowCollector); + assertEquals(3, rowCount); + + // Verify schema + String[][] colNameTypes = new String[][] { + {"datatypes.c1", "int"}, + {"datatypes.c2", "boolean"}, + {"datatypes.c3", "double"}, + {"datatypes.c4", "string"}, + {"datatypes.c5", "array"}, + {"datatypes.c6", "map"}, + {"datatypes.c7", "map"}, + {"datatypes.c8", "struct"}, + {"datatypes.c9", "tinyint"}, + {"datatypes.c10", "smallint"}, + {"datatypes.c11", "float"}, + {"datatypes.c12", "bigint"}, + {"datatypes.c13", "array>"}, + {"datatypes.c14", "map>"}, + {"datatypes.c15", "struct>"}, + {"datatypes.c16", "array,n:int>>"}, + {"datatypes.c17", "timestamp"}, + {"datatypes.c18", "decimal(16,7)"}, + {"datatypes.c19", "binary"}, + {"datatypes.c20", "date"}, + {"datatypes.c21", "varchar(20)"}, + {"datatypes.c22", "char(15)"}, + {"datatypes.c23", "binary"}, + }; + FieldDesc fieldDesc; + assertEquals(23, rowCollector.numColumns); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + fieldDesc = rowCollector.schema.getColumns().get(idx); + assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName()); + assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName()); + } + + // First row is all nulls + Object[] rowValues = rowCollector.rows.get(0); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + assertEquals("idx=" + idx, null, rowValues[idx]); + } + + // Second Row + rowValues = rowCollector.rows.get(1); + assertEquals(Integer.valueOf(-1), rowValues[0]); + assertEquals(Boolean.FALSE, rowValues[1]); + assertEquals(Double.valueOf(-1.1d), rowValues[2]); + assertEquals("", rowValues[3]); + + List c5Value = (List) rowValues[4]; + assertEquals(0, c5Value.size()); + + //Map c6Value = (Map) rowValues[5]; + //assertEquals(0, c6Value.size()); + + //Map c7Value = (Map) rowValues[6]; + //assertEquals(0, c7Value.size()); + + List c8Value = (List) rowValues[7]; + assertEquals(null, c8Value.get(0)); + assertEquals(null, c8Value.get(1)); + assertEquals(null, c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) -1), rowValues[8]); + assertEquals(Short.valueOf((short) -1), rowValues[9]); + assertEquals(Float.valueOf(-1.0f), rowValues[10]); + assertEquals(Long.valueOf(-1l), rowValues[11]); + + List c13Value = (List) rowValues[12]; + assertEquals(0, c13Value.size()); + + //Map c14Value = (Map) rowValues[13]; + //assertEquals(0, c14Value.size()); + + List c15Value = (List) rowValues[14]; + assertEquals(null, c15Value.get(0)); + assertEquals(null, c15Value.get(1)); + + //List c16Value = (List) rowValues[15]; + //assertEquals(0, c16Value.size()); + + assertEquals(null, rowValues[16]); + assertEquals(null, rowValues[17]); + assertEquals(null, rowValues[18]); + assertEquals(null, rowValues[19]); + assertEquals(null, rowValues[20]); + assertEquals(null, rowValues[21]); + assertEquals(null, rowValues[22]); + + // Third row + rowValues = rowCollector.rows.get(2); + assertEquals(Integer.valueOf(1), rowValues[0]); + assertEquals(Boolean.TRUE, rowValues[1]); + assertEquals(Double.valueOf(1.1d), rowValues[2]); + assertEquals("1", rowValues[3]); + + c5Value = (List) rowValues[4]; + assertEquals(2, c5Value.size()); + assertEquals(Integer.valueOf(1), c5Value.get(0)); + assertEquals(Integer.valueOf(2), c5Value.get(1)); + + //c6Value = (Map) rowValues[5]; + //assertEquals(2, c6Value.size()); + //assertEquals("x", c6Value.get(Integer.valueOf(1))); + //assertEquals("y", c6Value.get(Integer.valueOf(2))); + + //c7Value = (Map) rowValues[6]; + //assertEquals(1, c7Value.size()); + //assertEquals("v", c7Value.get("k")); + + c8Value = (List) rowValues[7]; + assertEquals("a", c8Value.get(0)); + assertEquals(Integer.valueOf(9), c8Value.get(1)); + assertEquals(Double.valueOf(2.2d), c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) 1), rowValues[8]); + assertEquals(Short.valueOf((short) 1), rowValues[9]); + assertEquals(Float.valueOf(1.0f), rowValues[10]); + assertEquals(Long.valueOf(1l), rowValues[11]); + + c13Value = (List) rowValues[12]; + assertEquals(2, c13Value.size()); + List listVal = (List) c13Value.get(0); + assertEquals("a", listVal.get(0)); + assertEquals("b", listVal.get(1)); + listVal = (List) c13Value.get(1); + assertEquals("c", listVal.get(0)); + assertEquals("d", listVal.get(1)); + + //c14Value = (Map) rowValues[13]; + //assertEquals(2, c14Value.size()); + //Map mapVal = (Map) c14Value.get(Integer.valueOf(1)); + //assertEquals(2, mapVal.size()); + //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11))); + //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13))); + //mapVal = (Map) c14Value.get(Integer.valueOf(2)); + //assertEquals(1, mapVal.size()); + //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21))); + + c15Value = (List) rowValues[14]; + assertEquals(Integer.valueOf(1), c15Value.get(0)); + listVal = (List) c15Value.get(1); + assertEquals(2, listVal.size()); + assertEquals(Integer.valueOf(2), listVal.get(0)); + assertEquals("x", listVal.get(1)); + + //c16Value = (List) rowValues[15]; + //assertEquals(2, c16Value.size()); + //listVal = (List) c16Value.get(0); + //assertEquals(2, listVal.size()); + //mapVal = (Map) listVal.get(0); + //assertEquals(0, mapVal.size()); + //assertEquals(Integer.valueOf(1), listVal.get(1)); + //listVal = (List) c16Value.get(1); + //mapVal = (Map) listVal.get(0); + //assertEquals(2, mapVal.size()); + //assertEquals("b", mapVal.get("a")); + //assertEquals("d", mapVal.get("c")); + //assertEquals(Integer.valueOf(2), listVal.get(1)); + + assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]); + assertEquals(new BigDecimal("123456789.123456"), rowValues[17]); + assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]); + assertEquals(Date.valueOf("2013-01-01"), rowValues[19]); + assertEquals("abc123", rowValues[20]); + assertEquals("abc123 ", rowValues[21]); + assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); + } + +} \ No newline at end of file diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java new file mode 100644 index 0000000000..41926476ca --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.hive.llap.LlapRowInputFormat; +import org.junit.BeforeClass; +import org.apache.hadoop.mapred.InputFormat; + +/* + * TestJdbcWithMiniLlap for llap Row format. + */ +public class TestJdbcWithMiniLlapRow extends TestJdbcWithMiniLlap { + + @BeforeClass + public static void beforeTest() throws Exception { + TestJdbcWithMiniLlap.beforeTest(false); + } + + @Override + protected InputFormat getInputFormat() { + return new LlapRowInputFormat(); + } +} \ No newline at end of file diff --git llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index a9ed3d200f..3820496c95 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -22,25 +22,15 @@ import java.io.BufferedInputStream; import java.io.Closeable; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.DataInputStream; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.hive.llap.io.ChunkedInputStream; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.JobConf; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,52 +139,61 @@ public boolean next(NullWritable key, V value) throws IOException { throw new IOException("Hit end of input, but did not find expected end of data indicator"); } - // There should be a reader event available, or coming soon, so okay to be blocking call. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case DONE: - break; - default: - throw new IOException("Expected reader event with done status, but got " - + event.getEventType() + " with message " + event.getMessage()); - } + processReaderEvent(); return false; } } catch (IOException io) { - try { - if (Thread.interrupted()) { - // Either we were interrupted by one of: - // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue - // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. - // Either way we should not try to block trying to read the reader events queue. - if (readerEvents.isEmpty()) { - // Case 2. - throw io; - } else { - // Case 1. Fail the reader, sending back the error we received from the reader event. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case ERROR: - throw new IOException("Received reader event error: " + event.getMessage(), io); - default: - throw new IOException("Got reader event type " + event.getEventType() - + ", expected error event", io); - } - } - } else { - // If we weren't interrupted, just propagate the error + failOnInterruption(io); + return false; + } + } + + protected void processReaderEvent() throws IOException { + // There should be a reader event available, or coming soon, so okay to be blocking call. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case DONE: + break; + default: + throw new IOException("Expected reader event with done status, but got " + + event.getEventType() + " with message " + event.getMessage()); + } + } + + protected void failOnInterruption(IOException io) throws IOException { + try { + if (Thread.interrupted()) { + // Either we were interrupted by one of: + // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. + // Either way we should not try to block trying to read the reader events queue. + if (readerEvents.isEmpty()) { + // Case 2. throw io; + } else { + // Case 1. Fail the reader, sending back the error we received from the reader event. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case ERROR: + throw new IOException("Received reader event error: " + event.getMessage(), io); + default: + throw new IOException("Got reader event type " + event.getEventType() + + ", expected error event", io); + } } - } finally { - // The external client handling umbilical responses and the connection to read the incoming - // data are not coupled. Calling close() here to make sure an error in one will cause the - // other to be closed as well. - try { - close(); - } catch (Exception err) { - // Don't propagate errors from close() since this will lose the original error above. - LOG.error("Closing RecordReader due to error and hit another error during close()", err); - } + } else { + // If we weren't interrupted, just propagate the error + throw io; + } + } finally { + // The external client handling umbilical responses and the connection to read the incoming + // data are not coupled. Calling close() here to make sure an error in one will cause the + // other to be closed as well. + try { + close(); + } catch (Exception err) { + // Don't propagate errors from close() since this will lose the original error above. + LOG.error("Closing RecordReader due to error and hit another error during close()", err); } } } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java index 1cfbf3a86e..0316ce8ced 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java @@ -70,17 +70,17 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class); protected final Configuration conf; - protected final RecordReader reader; + protected final RecordReader reader; protected final Schema schema; protected final AbstractSerDe serde; - protected final BytesWritable data; + protected final Writable data; public LlapRowRecordReader(Configuration conf, Schema schema, - RecordReader reader) throws IOException { + RecordReader reader) throws IOException { this.conf = conf; this.schema = schema; this.reader = reader; - this.data = new BytesWritable(); + this.data = reader.createValue(); try { serde = initSerDe(conf); @@ -118,7 +118,7 @@ public float getProgress() throws IOException { public boolean next(NullWritable key, Row value) throws IOException { Preconditions.checkArgument(value != null); - boolean hasNext = reader.next(key, data); + boolean hasNext = reader.next(key, data); if (hasNext) { // Deserialize data to column values, and populate the row record Object rowObj; @@ -216,7 +216,7 @@ static Object convertValue(Object val, ObjectInspector oi) { return convertedVal; } - static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) { + protected static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) { Schema structSchema = row.getSchema(); // Add struct field data to the Row List structFields = soi.getAllStructFieldRefs(); @@ -230,6 +230,11 @@ static void setRowFromStruct(Row row, Object structVal, StructObjectInspector so } } + //Factory method for serDe + protected AbstractSerDe createSerDe() throws SerDeException { + return new LazyBinarySerDe(); + } + protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException { Properties props = new Properties(); StringBuilder columnsBuffer = new StringBuilder(); @@ -249,7 +254,7 @@ protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException { props.put(serdeConstants.LIST_COLUMNS, columns); props.put(serdeConstants.LIST_COLUMN_TYPES, types); props.put(serdeConstants.ESCAPE_CHAR, "\\"); - AbstractSerDe serde = new LazyBinarySerDe(); + AbstractSerDe serde = createSerDe(); serde.initialize(conf, props); return serde; diff --git llap-ext-client/pom.xml llap-ext-client/pom.xml index ed4704b4cd..295d3e6319 100644 --- llap-ext-client/pom.xml +++ llap-ext-client/pom.xml @@ -41,6 +41,11 @@ org.apache.hive + hive-exec + ${project.version} + + + org.apache.hive hive-llap-client ${project.version} diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java new file mode 100644 index 0000000000..d9c5666bc4 --- /dev/null +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.common.base.Preconditions; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +/* + * Read from Arrow stream batch-by-batch + */ +public class LlapArrowBatchRecordReader extends LlapBaseRecordReader { + + private BufferAllocator allocator; + private ArrowStreamReader arrowStreamReader; + + public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class clazz, + JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + super(in, schema, clazz, job, client, socket); + allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit); + this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); + } + + @Override + public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException { + try { + // Need a way to know what thread to interrupt, since this is a blocking thread. + setReaderThread(Thread.currentThread()); + + boolean hasInput = arrowStreamReader.loadNextBatch(); + if (hasInput) { + VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot(); + //There must be at least one column vector + Preconditions.checkState(vectorSchemaRoot.getFieldVectors().size() > 0); + if(vectorSchemaRoot.getFieldVectors().get(0).getValueCount() == 0) { + //An empty batch will appear at the end of the stream + return false; + } + value.setVectorSchemaRoot(arrowStreamReader.getVectorSchemaRoot()); + return true; + } else { + processReaderEvent(); + return false; + } + } catch (IOException io) { + failOnInterruption(io); + return false; + } + } + + @Override + public void close() throws IOException { + arrowStreamReader.close(); + } + +} + diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java new file mode 100644 index 0000000000..fafbdee210 --- /dev/null +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import java.io.IOException; + +/* + * Adapts an Arrow batch reader to a row reader + */ +public class LlapArrowRowInputFormat implements InputFormat { + + private LlapBaseInputFormat baseInputFormat; + + public LlapArrowRowInputFormat(long arrowAllocatorLimit) { + baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return baseInputFormat.getSplits(job, numSplits); + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { + LlapInputSplit llapSplit = (LlapInputSplit) split; + LlapArrowBatchRecordReader reader = + (LlapArrowBatchRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter); + return new LlapArrowRowRecordReader(job, reader.getSchema(), reader); + } +} diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java new file mode 100644 index 0000000000..d4179d5202 --- /dev/null +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.common.base.Preconditions; +import org.apache.arrow.vector.FieldVector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Buffers a batch for reading one row at a time. + */ +public class LlapArrowRowRecordReader extends LlapRowRecordReader { + + private static final Logger LOG = LoggerFactory.getLogger(LlapArrowRowRecordReader.class); + private int rowIndex = 0; + private int batchSize = 0; + + //Buffer one batch at a time, for row retrieval + private Object[][] currentBatch; + + public LlapArrowRowRecordReader(Configuration conf, Schema schema, + RecordReader reader) throws IOException { + super(conf, schema, reader); + } + + @Override + public boolean next(NullWritable key, Row value) throws IOException { + Preconditions.checkArgument(value != null); + boolean hasNext = false; + ArrowWrapperWritable batchData = (ArrowWrapperWritable) data; + if((batchSize == 0) || (rowIndex == batchSize)) { + //This is either the first batch or we've used up the current batch buffer + batchSize = 0; + rowIndex = 0; + hasNext = reader.next(key, data); + if(hasNext) { + //There is another batch to buffer + try { + List vectors = batchData.getVectorSchemaRoot().getFieldVectors(); + //hasNext implies there is some column in the batch + Preconditions.checkState(vectors.size() > 0); + //All the vectors have the same length, + //we can get the number of rows from the first vector + batchSize = vectors.get(0).getValueCount(); + ArrowWrapperWritable wrapper = new ArrowWrapperWritable(batchData.getVectorSchemaRoot()); + currentBatch = (Object[][]) serde.deserialize(wrapper); + StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector(); + setRowFromStruct(value, currentBatch[rowIndex], rowOI); + } catch (Exception e) { + LOG.error("Failed to fetch Arrow batch", e); + throw new RuntimeException(e); + } + } + //There were no more batches AND + //this is either the first batch or we've used up the current batch buffer. + //goto return false + } else if(rowIndex < batchSize) { + //Take a row from the current buffered batch + hasNext = true; + StructObjectInspector rowOI = null; + try { + rowOI = (StructObjectInspector) serde.getObjectInspector(); + } catch (SerDeException e) { + throw new RuntimeException(e); + } + setRowFromStruct(value, currentBatch[rowIndex], rowOI); + } + //Always inc the batch buffer index + //If we return false, it is just a noop + rowIndex++; + return hasNext; + } + + protected AbstractSerDe createSerDe() throws SerDeException { + return new ArrowColumnarBatchSerDe(); + } + +} diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index f4c7fa4b30..9ad9f67795 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -49,11 +49,12 @@ import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; -import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; @@ -104,6 +105,8 @@ private String user; // "hive", private String pwd; // "" private String query; + private boolean useArrow; + private long arrowAllocatorLimit; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -123,7 +126,14 @@ public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.query = query; } - public LlapBaseInputFormat() {} + public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) { + this.useArrow = useArrow; + this.arrowAllocatorLimit = arrowAllocatorLimit; + } + + public LlapBaseInputFormat() { + this.useArrow = false; + } @SuppressWarnings("unchecked") @@ -195,8 +205,16 @@ public LlapBaseInputFormat() {} LOG.info("Registered id: " + fragmentId); @SuppressWarnings("rawtypes") - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), - llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket); + LlapBaseRecordReader recordReader; + if(useArrow) { + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + arrowAllocatorLimit); + } else { + recordReader = new LlapBaseRecordReader(socket.getInputStream(), + llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket); + } umbilicalResponder.setRecordReader(recordReader); return recordReader; } diff --git pom.xml pom.xml index ce3da37d2b..139482b2cf 100644 --- pom.xml +++ pom.xml @@ -119,6 +119,7 @@ 3.5.2 1.5.6 0.1 + //Include arrow for LlapOutputFormatService 0.8.0 1.11.0 1.7.7 diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java index df7b53f42a..5ca3fb6513 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java @@ -15,26 +15,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.ql.io.arrow; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -public class ArrowWrapperWritable implements Writable { +public class ArrowWrapperWritable implements WritableComparable { private VectorSchemaRoot vectorSchemaRoot; public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) { this.vectorSchemaRoot = vectorSchemaRoot; } + public ArrowWrapperWritable() {} public VectorSchemaRoot getVectorSchemaRoot() { return vectorSchemaRoot; } + public void setVectorSchemaRoot(VectorSchemaRoot vectorSchemaRoot) { + this.vectorSchemaRoot = vectorSchemaRoot; + } + @Override public void write(DataOutput dataOutput) throws IOException { throw new UnsupportedOperationException(); @@ -44,4 +50,8 @@ public void write(DataOutput dataOutput) throws IOException { public void readFields(DataInput dataInput) throws IOException { throw new UnsupportedOperationException(); } + + @Override public int compareTo(Object o) { + return 0; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java index 78cc188e65..7aa732bd5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.ql.io.arrow; import org.apache.arrow.memory.RootAllocator; @@ -41,4 +42,12 @@ public synchronized RootAllocator getRootAllocator(Configuration conf) { } return rootAllocator; } + + //arrowAllocatorLimit is ignored if an allocator was previously created + public synchronized RootAllocator getOrCreateRootAllocator(long arrowAllocatorLimit) { + if (rootAllocator == null) { + rootAllocator = new RootAllocator(arrowAllocatorLimit); + } + return rootAllocator; + } } diff --git ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 13a3070ef6..f27cdf4969 100644 --- ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -54,6 +54,7 @@ public static void setUp() throws Exception { Configuration conf = new Configuration(); // Pick random avail port HiveConf.setIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false); LlapOutputFormatService.initializeAndStart(conf, null); service = LlapOutputFormatService.get(); LlapProxy.setDaemon(true);