diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 57ae7a9..d1fe3a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -28,6 +28,7 @@ import org.apache.hadoop.util.StringUtils; import parquet.column.ColumnDescriptor; +import parquet.hadoop.api.InitContext; import parquet.hadoop.api.ReadSupport; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; @@ -65,14 +66,13 @@ * * It creates the readContext for Parquet side with the requested schema during the init phase. * - * @param configuration needed to get the wanted columns - * @param keyValueMetaData // unused - * @param fileSchema parquet file schema + * @param context * @return the parquet ReadContext */ @Override - public parquet.hadoop.api.ReadSupport.ReadContext init(final Configuration configuration, - final Map keyValueMetaData, final MessageType fileSchema) { + public parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) { + Configuration configuration = context.getConfiguration(); + MessageType fileSchema = context.getFileSchema(); final String columns = configuration.get(IOConstants.COLUMNS); final Map contextMetadata = new HashMap(); final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); @@ -117,7 +117,7 @@ if (idx < listColumns.size()) { String col = listColumns.get(idx); if (indexAccess) { - typeListWanted.add(fileSchema.getFields().get(idx)); + typeListWanted.add(fileSchema.getFields().get(idx)); } else { col = col.toLowerCase(); if (lowerCaseFileSchemaColumns.containsKey(col)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index a261996..d9d6015 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -15,7 +15,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +47,7 @@ import parquet.hadoop.ParquetFileReader; import parquet.hadoop.ParquetInputFormat; import parquet.hadoop.ParquetInputSplit; +import parquet.hadoop.api.InitContext; import parquet.hadoop.api.ReadSupport.ReadContext; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.FileMetaData; @@ -243,8 +249,8 @@ protected ParquetInputSplit getSplit( final List blocks = parquetMetadata.getBlocks(); final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - final ReadContext readContext = new DataWritableReadSupport() - .init(jobConf, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema()); + final ReadContext readContext = new DataWritableReadSupport().init(new InitContext(jobConf, + toSetMultiMap(fileMetaData.getKeyValueMetaData()), fileMetaData.getSchema())); schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() .get(DataWritableReadSupport.HIVE_SCHEMA_KEY)).getFieldCount(); final List splitGroup = new ArrayList(); @@ -278,4 +284,14 @@ protected ParquetInputSplit getSplit( } return split; } + + private static Map> toSetMultiMap(Map map) { + Map> setMultiMap = new HashMap>(); + for (Map.Entry entry : map.entrySet()) { + Set set = new HashSet(); + set.add(entry.getValue()); + setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set)); + } + return Collections.unmodifiableMap(setMultiMap); + } }