diff --git data/files/parquet_columnar.txt data/files/parquet_columnar.txt new file mode 100644 index 0000000..5db1282 --- /dev/null +++ data/files/parquet_columnar.txt @@ -0,0 +1,21 @@ +1abc00|1|1.0 +1def01|2|1.1 +1ghi02|3|1.2 +1jkl03|1|1.3 +1mno04|2|1.4 +1pqr05|3|1.0 +1stu06|1|1.1 +1vwx07|2|1.2 +1yza08|3|1.3 +1bcd09|1|1.4 +1efg10|2|1.0 +1hij11|3|1.1 +1klm12|1|1.2 +1nop13|2|1.3 +1qrs14|3|1.4 +1tuv15|1|1.0 +1wxy16|2|1.1 +1zab17|3|1.2 +1cde18|1|1.3 +1fgh19|2|1.4 +1ijk20|3|1.0 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 7c175aa..45c0369 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask; import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask; @@ -3670,7 +3671,8 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { MetadataTypedColumnsetSerDe.class.getName()) && !tbl.getSerializationLib().equals(LazySimpleSerDe.class.getName()) && !tbl.getSerializationLib().equals(ColumnarSerDe.class.getName()) - && !tbl.getSerializationLib().equals(DynamicSerDe.class.getName())) { + && !tbl.getSerializationLib().equals(DynamicSerDe.class.getName()) + && !tbl.getSerializationLib().equals(ParquetHiveSerDe.class.getName())) { throw new HiveException(ErrorMsg.CANNOT_REPLACE_COLUMNS, alterTbl.getOldName()); } tbl.getTTable().getSd().setCols(alterTbl.getNewCols()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 94c780a..c100dce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -46,7 +46,8 @@ private static final String TABLE_SCHEMA = "table_schema"; public static final String HIVE_SCHEMA_KEY = "HIVE_TABLE_SCHEMA"; - + public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; + /** * From a string which columns names (including hive column), return a list * of string columns @@ -95,7 +96,8 @@ for (final Integer idx : indexColumnsWanted) { typeListWanted.add(tableSchema.getType(listColumns.get(idx))); } - requestedSchemaByUser = new MessageType(fileSchema.getName(), typeListWanted); + requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(), + typeListWanted), fileSchema, configuration); return new ReadContext(requestedSchemaByUser, contextMetadata); } else { @@ -123,8 +125,31 @@ throw new IllegalStateException("ReadContext not initialized properly. " + "Don't know the Hive Schema."); } - final MessageType tableSchema = MessageTypeParser. - parseMessageType(metadata.get(HIVE_SCHEMA_KEY)); + final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser. + parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration); + return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema); } + + /** + * Determine the file column names based on the position within the requested columns and + * use that as the requested schema. + */ + private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema, + Configuration configuration) { + if(configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) { + final List listColumns = getColumns(configuration.get(IOConstants.COLUMNS)); + + List requestedTypes = new ArrayList(); + + for(Type t : requestedSchema.getFields()) { + int index = listColumns.indexOf(t.getName()); + requestedTypes.add(fileSchema.getType(index)); + } + + requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes); + } + + return requestedSchema; + } } diff --git ql/src/test/queries/clientpositive/parquet_columnar.q ql/src/test/queries/clientpositive/parquet_columnar.q new file mode 100644 index 0000000..4303d3e --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_columnar.q @@ -0,0 +1,27 @@ +set parquet.column.index.access=true; + +DROP TABLE IF EXISTS parquet_columnar_access_stage; +DROP TABLE IF EXISTS parquet_columnar_access; +DROP TABLE IF EXISTS parquet_columnar_renamed; + +CREATE TABLE parquet_columnar_access_stage ( + s string, + i int, + f float + ) ROW FORMAT DELIMITED + FIELDS TERMINATED BY '|'; + +CREATE TABLE parquet_columnar_access ( + s string, + i int, + f float + ) STORED AS PARQUET; + +LOAD DATA LOCAL INPATH '../../data/files/parquet_columnar.txt' OVERWRITE INTO TABLE parquet_columnar_access_stage; + +INSERT OVERWRITE TABLE parquet_columnar_access SELECT * FROM parquet_columnar_access_stage; +SELECT * FROM parquet_columnar_access; + +ALTER TABLE parquet_columnar_access REPLACE COLUMNS (s1 string, i1 int, f1 float); + +SELECT * FROM parquet_columnar_access; diff --git ql/src/test/results/clientpositive/parquet_columnar.q.out ql/src/test/results/clientpositive/parquet_columnar.q.out new file mode 100644 index 0000000..5ccf37a --- /dev/null +++ ql/src/test/results/clientpositive/parquet_columnar.q.out @@ -0,0 +1,138 @@ +PREHOOK: query: DROP TABLE IF EXISTS parquet_columnar_access_stage +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS parquet_columnar_access_stage +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS parquet_columnar_access +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS parquet_columnar_access +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS parquet_columnar_renamed +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS parquet_columnar_renamed +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE parquet_columnar_access_stage ( + s string, + i int, + f float + ) ROW FORMAT DELIMITED + FIELDS TERMINATED BY '|' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE parquet_columnar_access_stage ( + s string, + i int, + f float + ) ROW FORMAT DELIMITED + FIELDS TERMINATED BY '|' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_columnar_access_stage +PREHOOK: query: CREATE TABLE parquet_columnar_access ( + s string, + i int, + f float + ) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE parquet_columnar_access ( + s string, + i int, + f float + ) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_columnar_access +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_columnar.txt' OVERWRITE INTO TABLE parquet_columnar_access_stage +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@parquet_columnar_access_stage +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_columnar.txt' OVERWRITE INTO TABLE parquet_columnar_access_stage +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@parquet_columnar_access_stage +PREHOOK: query: INSERT OVERWRITE TABLE parquet_columnar_access SELECT * FROM parquet_columnar_access_stage +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_columnar_access_stage +PREHOOK: Output: default@parquet_columnar_access +POSTHOOK: query: INSERT OVERWRITE TABLE parquet_columnar_access SELECT * FROM parquet_columnar_access_stage +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_columnar_access_stage +POSTHOOK: Output: default@parquet_columnar_access +POSTHOOK: Lineage: parquet_columnar_access.f SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:f, type:float, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.i SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.s SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:s, type:string, comment:null), ] +PREHOOK: query: SELECT * FROM parquet_columnar_access +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_columnar_access +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_columnar_access +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_columnar_access +#### A masked pattern was here #### +POSTHOOK: Lineage: parquet_columnar_access.f SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:f, type:float, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.i SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.s SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:s, type:string, comment:null), ] +1abc00 1 1.0 +1def01 2 1.1 +1ghi02 3 1.2 +1jkl03 1 1.3 +1mno04 2 1.4 +1pqr05 3 1.0 +1stu06 1 1.1 +1vwx07 2 1.2 +1yza08 3 1.3 +1bcd09 1 1.4 +1efg10 2 1.0 +1hij11 3 1.1 +1klm12 1 1.2 +1nop13 2 1.3 +1qrs14 3 1.4 +1tuv15 1 1.0 +1wxy16 2 1.1 +1zab17 3 1.2 +1cde18 1 1.3 +1fgh19 2 1.4 +1ijk20 3 1.0 +PREHOOK: query: ALTER TABLE parquet_columnar_access REPLACE COLUMNS (s1 string, i1 int, f1 float) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@parquet_columnar_access +PREHOOK: Output: default@parquet_columnar_access +POSTHOOK: query: ALTER TABLE parquet_columnar_access REPLACE COLUMNS (s1 string, i1 int, f1 float) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@parquet_columnar_access +POSTHOOK: Output: default@parquet_columnar_access +POSTHOOK: Lineage: parquet_columnar_access.f SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:f, type:float, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.i SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.s SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:s, type:string, comment:null), ] +PREHOOK: query: SELECT * FROM parquet_columnar_access +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_columnar_access +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_columnar_access +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_columnar_access +#### A masked pattern was here #### +POSTHOOK: Lineage: parquet_columnar_access.f SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:f, type:float, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.i SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_columnar_access.s SIMPLE [(parquet_columnar_access_stage)parquet_columnar_access_stage.FieldSchema(name:s, type:string, comment:null), ] +1abc00 1 1.0 +1def01 2 1.1 +1ghi02 3 1.2 +1jkl03 1 1.3 +1mno04 2 1.4 +1pqr05 3 1.0 +1stu06 1 1.1 +1vwx07 2 1.2 +1yza08 3 1.3 +1bcd09 1 1.4 +1efg10 2 1.0 +1hij11 3 1.1 +1klm12 1 1.2 +1nop13 2 1.3 +1qrs14 3 1.4 +1tuv15 1 1.0 +1wxy16 2 1.1 +1zab17 3 1.2 +1cde18 1 1.3 +1fgh19 2 1.4 +1ijk20 3 1.0