diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 604cbbc..8645d51 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -16,7 +16,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -354,34 +353,15 @@ private static GroupType buildProjectedGroupType( String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); List columnTypesList = getColumnTypes(columnTypes); - MessageType tableSchema; - if (indexAccess) { - List indexSequence = new ArrayList(); - - // Generates a sequence list of indexes - for(int i = 0; i < columnNamesList.size(); i++) { - indexSequence.add(i); - } - - tableSchema = getSchemaByIndex(fileSchema, columnNamesList, indexSequence); - } else { - - tableSchema = getSchemaByName(fileSchema, columnNamesList, columnTypesList); - } + MessageType tableSchema = + getRequestedSchemaForIndexAccess(indexAccess, columnNamesList, columnTypesList, fileSchema); contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, tableSchema.toString()); contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess)); this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); - Set groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration); - List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); - if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { - MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList, - indexColumnsWanted, groupPaths); - return new ReadContext(requestedSchemaByUser, contextMetadata); - } else { - return new ReadContext(tableSchema, contextMetadata); - } + return new ReadContext(getRequestedPrunedSchema(columnNamesList, tableSchema, configuration), + contextMetadata); } else { contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, fileSchema.toString()); return new ReadContext(fileSchema, contextMetadata); @@ -389,6 +369,67 @@ private static GroupType buildProjectedGroupType( } /** + * It's used for vectorized code path. + * @param indexAccess + * @param columnNamesList + * @param columnTypesList + * @param fileSchema + * @param configuration + * @return + */ + public static MessageType getRequestedSchema( + boolean indexAccess, + List columnNamesList, + List columnTypesList, + MessageType fileSchema, + Configuration configuration) { + MessageType tableSchema = + getRequestedSchemaForIndexAccess(indexAccess, columnNamesList, columnTypesList, fileSchema); + + List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + //TODO Duplicated code for init method since vectorization reader path doesn't support Nested + // column pruning so far. See HIVE-15156 + if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { + return DataWritableReadSupport + .getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); + } else { + return fileSchema; + } + } + + private static MessageType getRequestedSchemaForIndexAccess( + boolean indexAccess, + List columnNamesList, + List columnTypesList, + MessageType fileSchema) { + if (indexAccess) { + List indexSequence = new ArrayList(); + + // Generates a sequence list of indexes + for (int i = 0; i < columnNamesList.size(); i++) { + indexSequence.add(i); + } + + return getSchemaByIndex(fileSchema, columnNamesList, indexSequence); + } else { + return getSchemaByName(fileSchema, columnNamesList, columnTypesList); + } + } + + private static MessageType getRequestedPrunedSchema( + List columnNamesList, + MessageType fileSchema, + Configuration configuration) { + Set groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration); + List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { + return getProjectedSchema(fileSchema, columnNamesList, indexColumnsWanted, groupPaths); + } else { + return fileSchema; + } + } + + /** * * It creates the hive read support to interpret data from parquet to hive * diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 0977759..9c75f1d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -254,29 +254,9 @@ public void initialize( } this.fileSchema = footer.getFileMetaData().getSchema(); - MessageType tableSchema; - if (indexAccess) { - List indexSequence = new ArrayList<>(); - - // Generates a sequence list of indexes - for(int i = 0; i < columnNamesList.size(); i++) { - indexSequence.add(i); - } - - tableSchema = DataWritableReadSupport.getSchemaByIndex(fileSchema, columnNamesList, - indexSequence); - } else { - tableSchema = DataWritableReadSupport.getSchemaByName(fileSchema, columnNamesList, - columnTypesList); - } - indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); - if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { - requestedSchema = - DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); - } else { - requestedSchema = fileSchema; - } + requestedSchema = DataWritableReadSupport + .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration); Path path = wrapPathForCache(file, cacheKey, configuration, blocks); this.reader = new ParquetFileReader(