diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index ab79b42aa9..8467cea58c 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -107,7 +107,7 @@ private static Path dataTypesFilePath; protected static HiveConf conf = null; - private static Connection hs2Conn = null; + protected static Connection hs2Conn = null; // This method should be called by sub-classes in a @BeforeClass initializer public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception { diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java index 55a2df801e..35eda6cb0a 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java @@ -18,11 +18,18 @@ package org.apache.hive.jdbc; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; + import java.math.BigDecimal; + +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.Timestamp; + +import java.sql.Statement; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.Row; @@ -33,6 +40,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; +import org.junit.Test; /** * TestJdbcWithMiniLlap for Arrow format with vectorized output sink @@ -231,5 +239,173 @@ public void testDataTypes() throws Exception { assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); } + + @Test + public void testTypesNestedInListWithLimitAndFilters() throws Exception { + try (Statement statement = hs2Conn.createStatement()) { + statement.execute("CREATE TABLE complex_tbl(c1 array, " + + "c2 array>, " + + "c3 array>>, " + + "c4 int) STORED AS ORC"); + + statement.executeUpdate("INSERT INTO complex_tbl VALUES " + + "(" + + "ARRAY('a1', 'a2', 'a3', null), " + + "ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')), " + + "ARRAY((ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')))), " + + "1), " + + "(" + + "ARRAY('b1'), " + + "ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4')), " + + "ARRAY((ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4'))), " + + "(ARRAY(NAMED_STRUCT('f1','b5', 'f2','b6'), NAMED_STRUCT('f1','b7', 'f2','b8')))), " + + "2), " + + "(" + + "ARRAY('c1', 'c2'), ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), NAMED_STRUCT('f1','c3', 'f2','c4'), " + + "NAMED_STRUCT('f1','c5', 'f2','c6')), ARRAY((ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), " + + "NAMED_STRUCT('f1','c3', 'f2','c4'))), (ARRAY(NAMED_STRUCT('f1','c5', 'f2','c6'), " + + "NAMED_STRUCT('f1','c7', 'f2','c8'))), (ARRAY(NAMED_STRUCT('f1','c9', 'f2','c10'), " + + "NAMED_STRUCT('f1','c11', 'f2','c12')))), " + + "3), " + + "(" + + "ARRAY(null), " + + "ARRAY(NAMED_STRUCT('f1','d1', 'f2','d2'), NAMED_STRUCT('f1','d3', 'f2','d4'), " + + "NAMED_STRUCT('f1','d5', 'f2','d6'), NAMED_STRUCT('f1','d7', 'f2','d8')), " + + "ARRAY((ARRAY(NAMED_STRUCT('f1','d1', 'f2', 'd2')))), " + + "4)"); + + } + + List expected = new ArrayList<>(); + expected.add(new Object[]{ + asList("a1", "a2", "a3", null), + asList(asList("a1", "a2"), asList("a3", "a4")), + asList(asList(asList("a1", "a2"), asList("a3", "a4"))), + 1 + }); + expected.add(new Object[]{ + asList("b1"), + asList(asList("b1", "b2"), asList("b3", "b4")), + asList(asList(asList("b1", "b2"), asList("b3", "b4")), asList(asList("b5", "b6"), asList("b7", "b8"))), + 2 + }); + expected.add(new Object[]{ + asList("c1", "c2"), + asList(asList("c1", "c2"), asList("c3", "c4"), asList("c5", "c6")), + asList(asList(asList("c1", "c2"), asList("c3", "c4")), asList(asList("c5", "c6"), asList("c7", "c8")), + asList(asList("c9", "c10"), asList("c11", "c12"))), + 3 + }); + List nullList = new ArrayList<>(); + nullList.add(null); + expected.add(new Object[]{ + nullList, + asList(asList("d1", "d2"), asList("d3", "d4"), asList("d5", "d6"), asList("d7", "d8")), + asList(asList(asList("d1", "d2"))), + 4 + }); + + // test without limit and filters (i.e VectorizedRowBatch#selectedInUse=false) + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from complex_tbl"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), + expected.get(1), + expected.get(2), + expected.get(3)); + + // test with filter + rowCollector = new RowCollector2(); + query = "select * from complex_tbl where c4 > 1 "; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1), expected.get(2), expected.get(3)); + + // test with limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl limit 3"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2)); + + // test with filters and limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl where c4 > 1 limit 2"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1), expected.get(2)); + + } + + @Test + public void testTypesNestedInMapWithLimitAndFilters() throws Exception { + try (Statement statement = hs2Conn.createStatement()) { + statement.execute("CREATE TABLE complex_tbl2(c1 map," + + " c2 map>, " + + " c3 map>, c4 int) STORED AS ORC"); + + statement.executeUpdate("INSERT INTO complex_tbl2 VALUES " + + "(MAP(1, 'a1'), MAP(1, ARRAY('a1', 'a2')), MAP(1, NAMED_STRUCT('f1','a1', 'f2','a2')), " + + "1), " + + "(MAP(1, 'b1',2, 'b2'), MAP(1, ARRAY('b1', 'b2'), 2, ARRAY('b3') ), " + + "MAP(1, NAMED_STRUCT('f1','b1', 'f2','b2')), " + + "2), " + + "(MAP(1, 'c1',2, 'c2'), MAP(1, ARRAY('c1', 'c2'), 2, ARRAY('c3') ), " + + "MAP(1, NAMED_STRUCT('f1','c1', 'f2','c2'), 2, NAMED_STRUCT('f1', 'c3', 'f2', 'c4') ), " + + "3)"); + + } + + List expected = new ArrayList<>(); + expected.add(new Object[]{ + ImmutableMap.of(1, "a1"), + ImmutableMap.of(1, asList("a1", "a2")), + ImmutableMap.of(1, asList("a1", "a2")), + 1, + }); + expected.add(new Object[]{ + ImmutableMap.of(1, "b1", 2, "b2"), + ImmutableMap.of(1, asList("b1", "b2"), 2, asList("b3")), + ImmutableMap.of(1, asList("b1", "b2")), + 2, + }); + expected.add(new Object[]{ + ImmutableMap.of(1, "c1", 2, "c2"), + ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3")), + ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3", "c4")), + 3, + }); + + + // test without limit and filters (i.e. VectorizedRowBatch#selectedInUse=false) + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from complex_tbl2"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2)); + + // test with filter + rowCollector = new RowCollector2(); + query = "select * from complex_tbl2 where c4 > 1 "; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1), expected.get(2)); + + // test with limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl2 limit 2"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), expected.get(1)); + + // test with filters and limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl2 where c4 > 1 limit 1"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1)); + + } + + private void verifyResult(List actual, Object[]... expected) { + assertEquals(expected.length, actual.size()); + for (int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual.get(i)); + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 5ec800d1e6..035097759a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; @@ -314,7 +315,11 @@ private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTyp final ArrowBuf validityBuffer = arrowVector.getValidityBuffer(); for (int rowIndex = 0; rowIndex < size; rowIndex++) { - if (hiveVector.isNull[rowIndex]) { + int selectedIndex = rowIndex; + if (vectorizedRowBatch.selectedInUse) { + selectedIndex = vectorizedRowBatch.selected[rowIndex]; + } + if (hiveVector.isNull[selectedIndex]) { BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0); } else { BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex); @@ -365,27 +370,75 @@ private void writeStruct(NonNullableStructVector arrowVector, StructColumnVector } } + // selected[] points to the valid/filtered/selected records at row level. + // for MultiValuedColumnVector such as ListColumnVector one record of vector points to multiple nested records. + // In child vectors we get these records in exploded manner i.e. the number of records in child vectors can have size more + // than actual the VectorizedRowBatch, consequently selected[] also needs to be readjusted. + // This method creates a shallow copy of VectorizedRowBatch with corrected size and selected[] + + private static VectorizedRowBatch correctSelectedAndSize(VectorizedRowBatch sourceVrb, + MultiValuedColumnVector multiValuedColumnVector) { + + VectorizedRowBatch vrb = new VectorizedRowBatch(sourceVrb.numCols, sourceVrb.size); + vrb.cols = sourceVrb.cols; + vrb.endOfFile = sourceVrb.endOfFile; + vrb.projectedColumns = sourceVrb.projectedColumns; + vrb.projectionSize = sourceVrb.projectionSize; + vrb.selectedInUse = sourceVrb.selectedInUse; + vrb.setPartitionInfo(sourceVrb.getDataColumnCount(), sourceVrb.getPartitionColumnCount()); + + int correctedSize = 0; + final int[] srcVrbSelected = sourceVrb.selected; + for (int i = 0; i < sourceVrb.size; i++) { + correctedSize += multiValuedColumnVector.lengths[srcVrbSelected[i]]; + } + + int newIndex = 0; + final int[] selectedOffsetsCorrected = new int[correctedSize]; + for (int i = 0; i < sourceVrb.size; i++) { + long elementIndex = multiValuedColumnVector.offsets[srcVrbSelected[i]]; + long elementSize = multiValuedColumnVector.lengths[srcVrbSelected[i]]; + for (int j = 0; j < elementSize; j++) { + selectedOffsetsCorrected[newIndex++] = (int) (elementIndex + j); + } + } + vrb.selected = selectedOffsetsCorrected; + vrb.size = correctedSize; + return vrb; + } + private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size, - VectorizedRowBatch vectorizedRowBatch, boolean isNative) { + VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final int OFFSET_WIDTH = 4; final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo(); final ColumnVector hiveElementVector = hiveVector.child; final FieldVector arrowElementVector = - (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector(); - arrowElementVector.setInitialCapacity(hiveVector.childCount); + (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector(); + + VectorizedRowBatch correctedVrb = vectorizedRowBatch; + int correctedSize = hiveVector.childCount; + if (vectorizedRowBatch.selectedInUse) { + correctedVrb = correctSelectedAndSize(vectorizedRowBatch, hiveVector); + correctedSize = correctedVrb.size; + } + arrowElementVector.setInitialCapacity(correctedSize); arrowElementVector.allocateNew(); - write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount, vectorizedRowBatch, isNative); + write(arrowElementVector, hiveElementVector, elementTypeInfo, correctedSize, correctedVrb, isNative); final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer(); int nextOffset = 0; for (int rowIndex = 0; rowIndex < size; rowIndex++) { - if (hiveVector.isNull[rowIndex]) { + int selectedIndex = rowIndex; + if (vectorizedRowBatch.selectedInUse) { + selectedIndex = vectorizedRowBatch.selected[rowIndex]; + } + if (hiveVector.isNull[selectedIndex]) { offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset); } else { offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset); - nextOffset += (int) hiveVector.lengths[rowIndex]; + nextOffset += (int) hiveVector.lengths[selectedIndex]; arrowVector.setNotNull(rowIndex); } }