diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 4303ca9959..1b546d6593 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -97,6 +98,7 @@ private VectorizedRowBatchCtx rbCtx; private Object[] partitionValues; private Path cacheFsPath; + private static int MAP_DEFINITION_LEVEL_MAX = 5; /** * For each request column, the reader to read this column. This is NULL if this column @@ -507,7 +509,30 @@ private VectorizedColumnReader buildVectorizedParquetReader( throw new RuntimeException( "Failed to find related Parquet column descriptor with type " + type); } - List kvTypes = type.asGroupType().getFields(); + + // to handle the different Map definition in Parquet, eg: + // definition has 1 group: + // repeated group map (MAP_KEY_VALUE) + // {required binary key (UTF8); optional binary value (UTF8);} + // definition has 2 groups: + // optional group m1 (MAP) { + // repeated group map (MAP_KEY_VALUE) + // {required binary key (UTF8); optional binary value (UTF8);} + // } + int nestGroup = 0; + GroupType groupType = type.asGroupType(); + // if FieldCount == 2, get types for key & value, + // otherwise, continue to get the group type until MAP_DEFINITION_LEVEL_MAX. + while (groupType.getFieldCount() < 2) { + if (nestGroup > MAP_DEFINITION_LEVEL_MAX) { + throw new RuntimeException( + "More than " + MAP_DEFINITION_LEVEL_MAX + " level is found in Map definition, " + + "Failed to get the field types for Map with type " + type); + } + groupType = groupType.getFields().get(0).asGroupType(); + nestGroup++; + } + List kvTypes = groupType.getFields(); VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader( descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, kvTypes.get(0)); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java index c33e8ab4a9..185dfbb4cc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java @@ -56,6 +56,8 @@ protected static void writeMapData(ParquetWriter writer, boolean isDictio int mapSize = i % mapMaxSize + 1; if (!isNull) { + // the map_field is to test multiple level map definition + Group multipleLevelGroup = group.addGroup("map_field"); for (int j = 0; j < mapSize; j++) { int intValForMap = getIntValue(isDictionaryEncoding, mapElementIndex); long longValForMap = getLongValue(isDictionaryEncoding, mapElementIndex); @@ -74,6 +76,8 @@ protected static void writeMapData(ParquetWriter writer, boolean isDictio .append("value", binaryValForMap); group.addGroup("map_decimal").append("key", decimalValForMap) .append("value", decimalValForMap); + multipleLevelGroup.addGroup("map").append("key", binaryValForMap) + .append("value", binaryValForMap); mapElementIndex++; } } @@ -160,6 +164,14 @@ public void testRepeateMapRead() throws Exception { removeFile(); } + @Test + public void testMultipleDefinitionMapRead() throws Exception { + removeFile(); + writeMapData(initWriterFromFile(), false, 1023); + testMapRead(false, "multipleLevel", 1023); + removeFile(); + } + private void testMapReadAllType(boolean isDictionaryEncoding, int elementNum) throws Exception { testMapRead(isDictionaryEncoding, "int", elementNum); testMapRead(isDictionaryEncoding, "long", elementNum); @@ -267,6 +279,9 @@ private void setTypeConfiguration(String type, Configuration conf) { } else if ("decimal".equals(type)) { conf.set(IOConstants.COLUMNS, "map_decimal"); conf.set(IOConstants.COLUMNS_TYPES, "map"); + } else if ("multipleLevel".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_field"); + conf.set(IOConstants.COLUMNS_TYPES, "map"); } } @@ -291,6 +306,15 @@ private String getSchema(String type) { case "decimal": return String.format(schemaFormat, "decimal", "binary", "(DECIMAL(5,2))", "binary", "(DECIMAL(5,2))"); + case "multipleLevel": + return "message hive_schema {\n" + + "optional group map_field (MAP) {\n" + + " repeated group map (MAP_KEY_VALUE) {\n" + + " required binary key;\n" + + " optional binary value;\n" + + " }\n" + + "}\n" + + "}\n"; default: throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!"); } @@ -310,7 +334,7 @@ private void assertValue(String type, ColumnVector childVector, boolean isDictio } else if ("float".equals(type)) { assertEquals(getFloatValue(isDictionaryEncoding, valueIndex), ((DoubleColumnVector)childVector).vector[position], 0); - } else if ("binary".equals(type)) { + } else if ("binary".equals(type) || "multipleLevel".equals(type)) { String actual = new String(ArrayUtils .subarray(((BytesColumnVector)childVector).vector[position], ((BytesColumnVector)childVector).start[position],