diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index c100dce..8286ab8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.util.StringUtils; +import parquet.column.ColumnDescriptor; import parquet.hadoop.api.ReadSupport; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; @@ -46,8 +47,8 @@ private static final String TABLE_SCHEMA = "table_schema"; public static final String HIVE_SCHEMA_KEY = "HIVE_TABLE_SCHEMA"; - public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; - + public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; + /** * From a string which columns names (including hive column), return a list * of string columns @@ -75,12 +76,16 @@ final Map contextMetadata = new HashMap(); if (columns != null) { final List listColumns = getColumns(columns); - + final Map lowerCaseFileSchemaColumns = new HashMap(); + for (ColumnDescriptor c : fileSchema.getColumns()) { + lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(), c.getPath()[0]); + } final List typeListTable = new ArrayList(); - for (final String col : listColumns) { + for (String col : listColumns) { + col = col.toLowerCase(); // listColumns contains partition columns which are metadata only - if (fileSchema.containsField(col)) { - typeListTable.add(fileSchema.getType(col)); + if (lowerCaseFileSchemaColumns.containsKey(col)) { + typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col))); } else { // below allows schema evolution typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col)); @@ -94,9 +99,17 @@ final List typeListWanted = new ArrayList(); for (final Integer idx : indexColumnsWanted) { - typeListWanted.add(tableSchema.getType(listColumns.get(idx))); + String col = listColumns.get(idx).toLowerCase(); + if (lowerCaseFileSchemaColumns.containsKey(col)) { + typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col))); + } else { + // should never occur? + String msg = "Column " + col + " at index " + idx + " does not exist in " + + lowerCaseFileSchemaColumns; + throw new IllegalStateException(msg); + } } - requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(), + requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(), typeListWanted), fileSchema, configuration); return new ReadContext(requestedSchemaByUser, contextMetadata); @@ -127,10 +140,9 @@ } final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser. parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration); - return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema); } - + /** * Determine the file column names based on the position within the requested columns and * use that as the requested schema. @@ -139,17 +151,13 @@ private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType Configuration configuration) { if(configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) { final List listColumns = getColumns(configuration.get(IOConstants.COLUMNS)); - List requestedTypes = new ArrayList(); - for(Type t : requestedSchema.getFields()) { int index = listColumns.indexOf(t.getName()); requestedTypes.add(fileSchema.getType(index)); } - requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes); } - return requestedSchema; } }