Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.9.0
-
None
-
None
-
parquet-mr in CDH 5.14.2 (base version is 1.5.0)
parquet-mr 1.9.0
Description
I got a Parquet file written by Hive with this schema:
file schema: hive_schema -------------------------------------------------------------------------------- taxi_id: OPTIONAL BINARY O:UTF8 R:0 D:1 date: OPTIONAL BINARY O:UTF8 R:0 D:1 start_time: OPTIONAL INT64 R:0 D:1 end_time: OPTIONAL INT64 R:0 D:1 min_lat_wgs: OPTIONAL DOUBLE R:0 D:1 min_lng_wgs: OPTIONAL DOUBLE R:0 D:1 max_lat_wgs: OPTIONAL DOUBLE R:0 D:1 max_lng_wgs: OPTIONAL DOUBLE R:0 D:1 first_lat_wgs: OPTIONAL DOUBLE R:0 D:1 first_lng_wgs: OPTIONAL DOUBLE R:0 D:1 last_lat_wgs: OPTIONAL DOUBLE R:0 D:1 last_lng_wgs: OPTIONAL DOUBLE R:0 D:1 gps_log: OPTIONAL F:1 .bag: REPEATED F:1 ..array_element: OPTIONAL F:6 ...timestamp: OPTIONAL INT64 R:1 D:4 ...lat_wgs: OPTIONAL DOUBLE R:1 D:4 ...lng_wgs: OPTIONAL DOUBLE R:1 D:4 ...item: OPTIONAL INT32 R:1 D:4 ...direction: OPTIONAL INT32 R:1 D:4 ...vflag: OPTIONAL INT32 R:1 D:4
I want to use parquet-avro to read it, and use `AvroReadSupport.setRequestedProjection` to select a subset of field.
{ "type": "record", "name": "test", "fields": [ { "name": "taxi_id", "type": ["null", "string"] }, { "name": "gps_log", "type": [{ "type": "array", "items": ["null", { "name": "point", "type": "record", "fields": [ { "name": "lat_wgs", "type": ["null", "double"] }, { "name": "lng_wgs", "type": ["null", "double"] } ] }] }], "default": "null" } ] }
I try to read data with code:
Configuration conf = new Configuration(); AvroReadSupport.setRequestedProjection(conf, schema); conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, false); conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false); AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(conf, path);
And I got errors:
Exception in thread "main" parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: required group gps_log (LIST) { repeated group list { optional group element { optional double lat_wgs; optional double lng_wgs; } } } != optional group gps_log (LIST) { repeated group bag { optional group array_element { optional int64 timestamp; optional double lat_wgs; optional double lng_wgs; optional int32 item; optional int32 direction; optional int32 vflag; } } }
This error doesn't caused by the nullability of `gps_log`. If I mark it nullable in Avro schema, I'll always get a null value.
I try to add some code in `AvroSchemaConverter.convertField`:
diff --git a/AvroSchemaConverter.java b/AvroSchemaConverterNew.java index 0b8076b..48b56dd 100644 --- a/AvroSchemaConverter.java +++ b/AvroSchemaConverterNew.java @@ -50,12 +50,17 @@ public class AvroSchemaConverter { "parquet.avro.add-list-element-records"; private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true; + public static final String READ_HIVE_WRITE_FILE = + "parquet.avro.read-hive-write-file"; + private static final boolean READ_HIVE_WRITE_FILE_DEFAULT = false; + private final boolean assumeRepeatedIsListElement; private final boolean writeOldListStructure; public AvroSchemaConverter() { this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT; this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; + this.isReadHiveWriteFileDefault = READ_HIVE_WRITE_FILE_DEFAULT; } public AvroSchemaConverter(Configuration conf) { @@ -63,6 +68,9 @@ public class AvroSchemaConverter { ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT); this.writeOldListStructure = conf.getBoolean( WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT); + this.isReadHiveWriteFileDefault = conf.getBoolean( + READ_HIVE_WRITE_FILE, READ_HIVE_WRITE_FILE_DEFAULT + ); } /** @@ -137,7 +145,14 @@ public class AvroSchemaConverter { if (writeOldListStructure) { return ConversionPatterns.listType(repetition, fieldName, convertField("array", schema.getElementType(), REPEATED)); - } else { + } else if (isReadHiveWriteFileDefault) { + Type elementType = convertField("array_element", schema.getElementType()); + return new GroupType( + repetition, + fieldName, + LIST, + new GroupType(Type.Repetition.REPEATED, "bag", elementType)); + } else { return ConversionPatterns.listOfElements(repetition, fieldName, convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType())); }
It can read data with this file.
So is this a compatibility problem in parquet-avro, or just I missed some configuration?