diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 0fac1e4b7f..3c0532c9d0 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -73,6 +73,7 @@ private static String dataFileDir; private static Path kvDataFilePath; private static Path dataTypesFilePath; + private static Path over10KFilePath; protected static MiniHS2 miniHS2 = null; protected static HiveConf conf = null; @@ -86,6 +87,7 @@ public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception { dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); dataTypesFilePath = new Path(dataFileDir, "datatypes.txt"); + over10KFilePath = new Path(dataFileDir, "over10k"); Map confOverlay = new HashMap(); miniHS2.start(confOverlay); miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); @@ -185,6 +187,21 @@ protected void createDataTypesTable(String tableName) throws Exception { stmt.close(); } + protected void createOver10KTable(String tableName) throws Exception { + try (Statement stmt = hs2Conn.createStatement()) { + + String createQuery = + "create table " + tableName + " (t tinyint, si smallint, i int, b bigint, f float, d double, bo boolean, " + + "s string, ts timestamp, `dec` decimal(4,2), bin binary) row format delimited fields terminated by '|'"; + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute(createQuery); + // load data + stmt.execute("load data local inpath '" + over10KFilePath.toString() + "' into table " + tableName); + } + } + @Test(timeout = 60000) public void testLlapInputFormatEndToEnd() throws Exception { createTestTable("testtab1"); @@ -206,6 +223,34 @@ public void testLlapInputFormatEndToEnd() throws Exception { assertEquals(0, rowCount); } + @Test(timeout = 300000) + public void testLlapInputFormatEndToEndWithMultipleBatches() throws Exception { + String tableName = "over10k_table"; + + createOver10KTable(tableName); + + int rowCount; + + // Try with more than one batch + RowCollector rowCollector = new RowCollector(); + String query = "select * from " + tableName; + rowCount = processQuery(query, 1, rowCollector); + assertEquals(9999, rowCount); + + // Try with less than one batch + rowCollector.rows.clear(); + query = "select * from " + tableName + " where s = 'rachel brown'"; + rowCount = processQuery(query, 1, rowCollector); + assertEquals(17, rowCount); + + // Try empty rows query + rowCollector.rows.clear(); + query = "select * from " + tableName + " where false"; + rowCount = processQuery(query, 1, rowCollector); + assertEquals(0, rowCount); + } + + @Test(timeout = 60000) public void testNonAsciiStrings() throws Exception { createTestTable("testtab_nonascii"); diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java index d4179d5202..24a82c7f32 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java @@ -61,16 +61,22 @@ public boolean next(NullWritable key, Row value) throws IOException { //This is either the first batch or we've used up the current batch buffer batchSize = 0; rowIndex = 0; - hasNext = reader.next(key, data); - if(hasNext) { + + // since HIVE-22856, a zero length batch doesn't mean that we won't have any more batches + // we can have more batches with data even after after a zero length batch + // we should keep trying until we get a batch with some data or reader.next() returns false + while (batchSize == 0 && (hasNext = reader.next(key, data))) { + List vectors = batchData.getVectorSchemaRoot().getFieldVectors(); + //hasNext implies there is some column in the batch + Preconditions.checkState(vectors.size() > 0); + //All the vectors have the same length, + //we can get the number of rows from the first vector + batchSize = vectors.get(0).getValueCount(); + } + + if (hasNext) { //There is another batch to buffer try { - List vectors = batchData.getVectorSchemaRoot().getFieldVectors(); - //hasNext implies there is some column in the batch - Preconditions.checkState(vectors.size() > 0); - //All the vectors have the same length, - //we can get the number of rows from the first vector - batchSize = vectors.get(0).getValueCount(); ArrowWrapperWritable wrapper = new ArrowWrapperWritable(batchData.getVectorSchemaRoot()); currentBatch = (Object[][]) serde.deserialize(wrapper); StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();