diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 2ad7330..3f8e4d7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -75,6 +75,7 @@ final Map keyValueMetaData, final MessageType fileSchema) { final String columns = configuration.get(IOConstants.COLUMNS); final Map contextMetadata = new HashMap(); + final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); if (columns != null) { final List listColumns = getColumns(columns); final Map lowerCaseFileSchemaColumns = new HashMap(); @@ -82,45 +83,50 @@ lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(), c.getPath()[0]); } final List typeListTable = new ArrayList(); - for (String col : listColumns) { - col = col.toLowerCase(); - // listColumns contains partition columns which are metadata only - if (lowerCaseFileSchemaColumns.containsKey(col)) { - typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col))); - } else { - // below allows schema evolution - typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col)); + if(indexAccess) { + for (int index = 0; index < listColumns.size(); index++) { + //Take columns based on index or pad the field + if(index < fileSchema.getFieldCount()) { + typeListTable.add(fileSchema.getType(index)); + } else { + //prefixing with '_mask_' to ensure no conflict with named + //columns in the file schema + typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_mask_"+listColumns.get(index))); + } + } + } else { + for (String col : listColumns) { + col = col.toLowerCase(); + // listColumns contains partition columns which are metadata only + if (lowerCaseFileSchemaColumns.containsKey(col)) { + typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col))); + } else { + // below allows schema evolution + typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col)); + } } } MessageType tableSchema = new MessageType(TABLE_SCHEMA, typeListTable); contextMetadata.put(HIVE_SCHEMA_KEY, tableSchema.toString()); - MessageType requestedSchemaByUser = tableSchema; final List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); final List typeListWanted = new ArrayList(); - final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); + for (final Integer idx : indexColumnsWanted) { if (idx < listColumns.size()) { String col = listColumns.get(idx); if (indexAccess) { - typeListWanted.add(tableSchema.getType(col)); + typeListWanted.add(fileSchema.getFields().get(idx)); } else { col = col.toLowerCase(); if (lowerCaseFileSchemaColumns.containsKey(col)) { typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col))); - } else { - // should never occur? - String msg = "Column " + col + " at index " + idx + " does not exist in " + - lowerCaseFileSchemaColumns; - throw new IllegalStateException(msg); } } } } - requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(), - typeListWanted), fileSchema, configuration); - + MessageType requestedSchemaByUser = new MessageType(fileSchema.getName(), typeListWanted); return new ReadContext(requestedSchemaByUser, contextMetadata); } else { contextMetadata.put(HIVE_SCHEMA_KEY, fileSchema.toString()); @@ -147,26 +153,7 @@ throw new IllegalStateException("ReadContext not initialized properly. " + "Don't know the Hive Schema."); } - final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser. - parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration); + final MessageType tableSchema = MessageTypeParser.parseMessageType(metadata.get(HIVE_SCHEMA_KEY)); return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema); } - - /** - * Determine the file column names based on the position within the requested columns and - * use that as the requested schema. - */ - private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema, - Configuration configuration) { - if (configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) { - final List listColumns = getColumns(configuration.get(IOConstants.COLUMNS)); - List requestedTypes = new ArrayList(); - for(Type t : requestedSchema.getFields()) { - int index = listColumns.indexOf(t.getName()); - requestedTypes.add(fileSchema.getType(index)); - } - requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes); - } - return requestedSchema; - } -} \ No newline at end of file +}