From 3d1daefc65fb3c4bee045c90a77ad6866840575b Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Thu, 28 May 2020 14:22:49 +0100 Subject: [PATCH] HIVE-23561: Fixing arrow serializer for Decimals with selected Change-Id: Ie92fe13f134c71d2510dd82a9cbee39fe90a2273 --- .../ql/io/arrow/ArrowColumnarBatchSerDe.java | 4 +- .../hadoop/hive/ql/io/arrow/Serializer.java | 8 +- .../io/arrow/TestArrowColumnarBatchSerDe.java | 160 +++++++++++++++++- 3 files changed, 161 insertions(+), 11 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java index 04087073402..4896bc4190e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.io.arrow; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.complex.impl.UnionListWriter; @@ -97,7 +98,8 @@ StructObjectInspector rowObjectInspector; Configuration conf; - private Serializer serializer; + @VisibleForTesting + Serializer serializer; private Deserializer deserializer; @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index d5a9b2c2126..5a79641ea8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.io.arrow; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ArrowBuf; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; @@ -44,8 +45,6 @@ import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.hadoop.conf.Configuration; -import org.apache.arrow.vector.util.DecimalUtility; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -105,7 +104,8 @@ private final static byte[] EMPTY_BYTES = new byte[0]; // Hive columns - private final VectorizedRowBatch vectorizedRowBatch; + @VisibleForTesting + final VectorizedRowBatch vectorizedRowBatch; private final VectorAssignRow vectorAssignRow; private int batchSize; private BufferAllocator allocator; @@ -923,7 +923,7 @@ private static void writeGeneric(final FieldVector fieldVector, final ColumnVect final int scale = decimalVector.getScale(); decimalVector.set(i, ((DecimalColumnVector) hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale)); - final HiveDecimalWritable writable = ((DecimalColumnVector) hiveVector).vector[i]; + final HiveDecimalWritable writable = ((DecimalColumnVector) hiveVector).vector[j]; decimalHolder.precision = writable.precision(); decimalHolder.scale = scale; try (ArrowBuf arrowBuf = allocator.buffer(DecimalHolder.WIDTH)) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java index be15197d0bd..d5aaa9ef02b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java @@ -203,7 +203,14 @@ private static HiveDecimalWritable decimalW(HiveDecimal value) { private void initAndSerializeAndDeserialize(String[][] schema, Object[][] rows) throws SerDeException { ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe(); StructObjectInspector rowOI = initSerDe(serDe, schema); - serializeAndDeserialize(serDe, rows, rowOI); + serializeAndDeserialize(serDe, rows, rowOI, null); + } + + private void initAndSerializeAndDeserializeSelected(String[][] schema, Object[][] rows, int[] selected) + throws SerDeException { + ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe(); + StructObjectInspector rowOI = initSerDe(serDe, schema); + serializeAndDeserialize(serDe, rows, rowOI, selected); } private StructObjectInspector initSerDe(AbstractSerDe serDe, String[][] schema) @@ -232,19 +239,34 @@ private StructObjectInspector initSerDe(AbstractSerDe serDe, String[][] schema) } private void serializeAndDeserialize(ArrowColumnarBatchSerDe serDe, Object[][] rows, - StructObjectInspector rowOI) { + StructObjectInspector rowOI, int[] selectedRows) { ArrowWrapperWritable serialized = null; for (Object[] row : rows) { serialized = serDe.serialize(row, rowOI); } +// When obj is null serialized is not Null -- is this expected? +// assertTrue(serialized == null); + boolean useNativeSelected = selectedRows != null && selectedRows.length > 0; + // Pass null to complete a batch if (serialized == null) { - serialized = serDe.serialize(null, rowOI); + // Native-selected mode (triggering Serializer.writePrimitive) + if (useNativeSelected) { + serDe.serializer.vectorizedRowBatch.selectedInUse = true; + serDe.serializer.vectorizedRowBatch.size = selectedRows.length; + serDe.serializer.vectorizedRowBatch.selected = selectedRows; + // Call Native serialization directly + serialized = serDe.serializer.serializeBatch(serDe.serializer.vectorizedRowBatch, true); + } else { + // Non-native mode + serialized = serDe.serialize(null, rowOI); + } } final Object[][] deserializedRows = (Object[][]) serDe.deserialize(serialized); for (int rowIndex = 0; rowIndex < Math.min(deserializedRows.length, rows.length); rowIndex++) { - final Object[] row = rows[rowIndex]; + // expected row is either at rowIndex or selected[rowIndex] + final Object[] row = useNativeSelected ? rows[selectedRows[rowIndex]] : rows[rowIndex]; final Object[] deserializedRow = deserializedRows[rowIndex]; assertEquals(row.length, deserializedRow.length); @@ -398,6 +420,18 @@ public void testPrimitiveInteger() throws SerDeException { initAndSerializeAndDeserialize(schema, INTEGER_ROWS); } + @Test + public void testPrimitiveIntegerSelected() throws SerDeException { + String[][] schema = { + {"tinyint1", "tinyint"}, + {"smallint1", "smallint"}, + {"int1", "int"}, + {"bigint1", "bigint"} + }; + int[] selectedRows = new int[] {0, 3, 5}; + initAndSerializeAndDeserializeSelected(schema, INTEGER_ROWS, selectedRows); + } + @Test public void testPrimitiveBigInt10000() throws SerDeException { String[][] schema = { @@ -414,7 +448,7 @@ public void testPrimitiveBigInt10000() throws SerDeException { integerRows[i] = new Object[] {longW(i + j * batchSize)}; } - serializeAndDeserialize(serDe, integerRows, rowOI); + serializeAndDeserialize(serDe, integerRows, rowOI, null); } } @@ -436,7 +470,7 @@ public void testPrimitiveBigIntRandom() { integerRows[i] = new Object[] {longW(random.nextLong())}; } - serializeAndDeserialize(serDe, integerRows, rowOI); + serializeAndDeserialize(serDe, integerRows, rowOI, null); } } catch (Exception e) { throw new RuntimeException(e); @@ -453,6 +487,16 @@ public void testPrimitiveFloat() throws SerDeException { initAndSerializeAndDeserialize(schema, FLOAT_ROWS); } + @Test + public void testPrimitiveFloatSelected() throws SerDeException { + String[][] schema = { + {"float1", "float"}, + {"double1", "double"}, + }; + int[] selectedRows = new int[] {0, 3, 5, 7, 9}; + initAndSerializeAndDeserializeSelected(schema, FLOAT_ROWS, selectedRows); + } + @Test(expected = AssertionError.class) public void testPrimitiveFloatNaN() throws SerDeException { String[][] schema = { @@ -486,6 +530,17 @@ public void testPrimitiveString() throws SerDeException { initAndSerializeAndDeserialize(schema, STRING_ROWS); } + @Test + public void testPrimitiveStringSelected() throws SerDeException { + String[][] schema = { + {"string1", "string"}, + {"char1", "char(10)"}, + {"varchar1", "varchar(10)"}, + }; + int[] selectedRows = new int[] {0, 2, 4}; + initAndSerializeAndDeserializeSelected(schema, STRING_ROWS, selectedRows); + } + @Test public void testPrimitiveDTI() throws SerDeException { String[][] schema = { @@ -498,6 +553,18 @@ public void testPrimitiveDTI() throws SerDeException { initAndSerializeAndDeserialize(schema, DTI_ROWS); } + @Test + public void testPrimitiveDTISelected() throws SerDeException { + String[][] schema = { + {"date1", "date"}, + {"timestamp1", "timestamp"}, + {"interval_year_month1", "interval_year_month"}, + {"interval_day_time1", "interval_day_time"}, + }; + int[] selectedRows = new int[] {0, 2}; + initAndSerializeAndDeserializeSelected(schema, DTI_ROWS, selectedRows); + } + @Test public void testPrimitiveRandomTimestamp() throws SerDeException { String[][] schema = { @@ -556,6 +623,30 @@ public void testPositiveNegativeTSWithNanos() throws SerDeException { initAndSerializeAndDeserialize(schema, tsRows); } + @Test + public void testPositiveNegativeTSWithNanosSelected() throws SerDeException { + String[][] schema = { + {"timestamp1", "timestamp"}, + }; + + Object[][] tsRows = new Object[][]{ + {new TimestampWritableV2(Timestamp.valueOf("1963-04-01 09:01:10.123"))}, + {new TimestampWritableV2(Timestamp.valueOf("1800-04-01 09:01:10.123999"))}, + {new TimestampWritableV2(Timestamp.valueOf("1750-04-01 09:01:10.123999"))}, + {new TimestampWritableV2(Timestamp.valueOf("1700-04-01 09:01:10.999999"))}, + {new TimestampWritableV2(Timestamp.valueOf("2050-04-01 09:01:10.999999"))}, + {new TimestampWritableV2(Timestamp.valueOf("1991-06-05 09:01:10.999999"))}, + {new TimestampWritableV2(Timestamp.valueOf("1992-11-04 09:01:10.999999"))}, + {new TimestampWritableV2(Timestamp.valueOf("1970-01-01 00:00:00"))}, + {new TimestampWritableV2(Timestamp.valueOf("1964-01-01 00:00:04.78"))}, + {new TimestampWritableV2(Timestamp.valueOf("1950-01-01 09:23:03.21"))}, + {new TimestampWritableV2(Timestamp.valueOf("1956-01-01 10:09:03.00"))}, + {new TimestampWritableV2(Timestamp.valueOf("1947-08-27 10:25:36.26"))}, + }; + int[] selectedRows = new int[] {0, 2, 5, 7, 9, 11}; + initAndSerializeAndDeserializeSelected(schema, tsRows, selectedRows); + } + @Test public void testPrimitiveDecimal() throws SerDeException { String[][] schema = { @@ -565,6 +656,12 @@ public void testPrimitiveDecimal() throws SerDeException { initAndSerializeAndDeserialize(schema, DECIMAL_ROWS); } + @Test public void testNativeDecimalSelected() throws SerDeException { + String[][] schema = { { "decimal1", "decimal(38,10)" }, }; + int[] selectedRows = new int[] { 0, 2, 4 }; + initAndSerializeAndDeserializeSelected(schema, DECIMAL_ROWS, selectedRows); + } + @Test public void testRandomPrimitiveDecimal() throws SerDeException { String[][] schema = { @@ -599,6 +696,15 @@ public void testPrimitiveBoolean() throws SerDeException { initAndSerializeAndDeserialize(schema, BOOLEAN_ROWS); } + @Test + public void testPrimitiveBooleanSelected() throws SerDeException { + String[][] schema = { + {"boolean1", "boolean"}, + }; + int[] selectedRows = new int[] {1, 2}; + initAndSerializeAndDeserializeSelected(schema, BOOLEAN_ROWS, selectedRows); + } + @Test public void testPrimitiveBinary() throws SerDeException { String[][] schema = { @@ -608,6 +714,15 @@ public void testPrimitiveBinary() throws SerDeException { initAndSerializeAndDeserialize(schema, BINARY_ROWS); } + @Test + public void testPrimitiveBinarySelected() throws SerDeException { + String[][] schema = { + {"binary1", "binary"}, + }; + int[] selectedRows = new int[] {1, 3}; + initAndSerializeAndDeserializeSelected(schema, BINARY_ROWS, selectedRows); + } + private List[][] toList(Object[][] rows) { List[][] array = new List[rows.length][]; for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) { @@ -620,6 +735,18 @@ public void testPrimitiveBinary() throws SerDeException { return array; } + @Test + public void testListSelected() throws SerDeException { + String[][] schema = { + {"tinyint_list", "array"}, + {"smallint_list", "array"}, + {"int_list", "array"}, + {"bigint_list", "array"}, + }; + int[] selectedRows = new int[] {1, 3, 5}; + initAndSerializeAndDeserializeSelected(schema, toList(INTEGER_ROWS), selectedRows); + } + @Test public void testListInteger() throws SerDeException { String[][] schema = { @@ -692,6 +819,15 @@ public void testListBinary() throws SerDeException { return struct; } + @Test + public void testStructSelected() throws SerDeException { + String[][] schema = { + {"int_struct", "struct"}, + }; + int[] selectedRows = new int[] {1, 3, 5}; + initAndSerializeAndDeserializeSelected(schema, toStruct(INTEGER_ROWS), selectedRows); + } + @Test public void testStructInteger() throws SerDeException { String[][] schema = { @@ -770,6 +906,18 @@ public void testStructBinary() throws SerDeException { return array; } + @Test + public void testMapSelected() throws SerDeException { + String[][] schema = { + {"tinyint_map", "map"}, + {"smallint_map", "map"}, + {"int_map", "map"}, + {"bigint_map", "map"}, + }; + int[] selectedRows = new int[] {1, 3, 5}; + initAndSerializeAndDeserializeSelected(schema, toMap(INTEGER_ROWS), selectedRows); + } + @Test public void testMapInteger() throws SerDeException { String[][] schema = { -- 2.20.1 (Apple Git-117)