diff --git data/files/parquet_columnar.txt data/files/parquet_columnar.txt new file mode 100644 index 0000000..5db1282 --- /dev/null +++ data/files/parquet_columnar.txt @@ -0,0 +1,21 @@ +1abc00|1|1.0 +1def01|2|1.1 +1ghi02|3|1.2 +1jkl03|1|1.3 +1mno04|2|1.4 +1pqr05|3|1.0 +1stu06|1|1.1 +1vwx07|2|1.2 +1yza08|3|1.3 +1bcd09|1|1.4 +1efg10|2|1.0 +1hij11|3|1.1 +1klm12|1|1.2 +1nop13|2|1.3 +1qrs14|3|1.4 +1tuv15|1|1.0 +1wxy16|2|1.1 +1zab17|3|1.2 +1cde18|1|1.3 +1fgh19|2|1.4 +1ijk20|3|1.0 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 254e2b0..bec7ae8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask; import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask; @@ -3605,7 +3606,8 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { MetadataTypedColumnsetSerDe.class.getName()) && !tbl.getSerializationLib().equals(LazySimpleSerDe.class.getName()) && !tbl.getSerializationLib().equals(ColumnarSerDe.class.getName()) - && !tbl.getSerializationLib().equals(DynamicSerDe.class.getName())) { + && !tbl.getSerializationLib().equals(DynamicSerDe.class.getName()) + && !tbl.getSerializationLib().equals(ParquetHiveSerDe.class.getName())) { throw new HiveException(ErrorMsg.CANNOT_REPLACE_COLUMNS, alterTbl.getOldName()); } tbl.getTTable().getSd().setCols(alterTbl.getNewCols()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 94c780a..df3f83e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; @@ -95,7 +96,8 @@ for (final Integer idx : indexColumnsWanted) { typeListWanted.add(tableSchema.getType(listColumns.get(idx))); } - requestedSchemaByUser = new MessageType(fileSchema.getName(), typeListWanted); + requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(), + typeListWanted), fileSchema, configuration); return new ReadContext(requestedSchemaByUser, contextMetadata); } else { @@ -123,8 +125,31 @@ throw new IllegalStateException("ReadContext not initialized properly. " + "Don't know the Hive Schema."); } - final MessageType tableSchema = MessageTypeParser. - parseMessageType(metadata.get(HIVE_SCHEMA_KEY)); + final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser. + parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration); + return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema); } + + /** + * Determine the file column names based on the position within the requested columns and + * use that as the requested schema. + */ + private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema, + Configuration configuration) { + if(configuration.getBoolean(ParquetHiveSerDe.PARQUET_COLUMNAR_ACCESS, false)) { + final List listColumns = getColumns(configuration.get(IOConstants.COLUMNS)); + + List requestedTypes = new ArrayList(); + + for(Type t : requestedSchema.getFields()) { + int index = listColumns.indexOf(t.getName()); + requestedTypes.add(fileSchema.getType(index)); + } + + requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes); + } + + return requestedSchema; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index b689336..6fb7113 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -65,6 +65,9 @@ */ public class ParquetHiveSerDe extends AbstractSerDe { + public static final String PARQUET_COLUMNAR_ACCESS_DEFULT = "parquet.columnar.access.default"; + public static final String PARQUET_COLUMNAR_ACCESS = "parquet.columnar.access"; + public static final Text MAP_KEY = new Text("key"); public static final Text MAP_VALUE = new Text("value"); public static final Text MAP = new Text("map"); @@ -112,6 +115,11 @@ public final void initialize(final Configuration conf, final Properties tbl) thr rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + // Set column access + if(conf != null) { + conf.set(PARQUET_COLUMNAR_ACCESS, tbl.getProperty(PARQUET_COLUMNAR_ACCESS, conf.get(PARQUET_COLUMNAR_ACCESS_DEFULT, "false"))); + } + // Stats part stats = new SerDeStats(); serializedSize = 0; diff --git ql/src/test/queries/clientpositive/parquet_columnar.q ql/src/test/queries/clientpositive/parquet_columnar.q new file mode 100644 index 0000000..df71604 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_columnar.q @@ -0,0 +1,31 @@ +DROP TABLE IF EXISTS parquet_columnar_access_stage; +DROP TABLE IF EXISTS parquet_columnar_access; +DROP TABLE IF EXISTS parquet_columnar_renamed; + +CREATE TABLE parquet_columnar_access_stage ( + s string, + i int, + f float + ) ROW FORMAT DELIMITED + FIELDS TERMINATED BY '|'; + +CREATE TABLE parquet_columnar_access ( + s string, + i int, + f float + ) STORED AS PARQUET; + +CREATE TABLE parquet_columnar_renamed LIKE parquet_columnar_access; + +LOAD DATA LOCAL INPATH '../../data/files/parquet_columnar.txt' OVERWRITE INTO TABLE parquet_columnar_access_stage; + +INSERT OVERWRITE TABLE parquet_columnar_access SELECT * FROM parquet_columnar_access_stage; +INSERT OVERWRITE TABLE parquet_columnar_renamed SELECT * FROM parquet_columnar_access_stage; + +ALTER TABLE parquet_columnar_renamed REPLACE COLUMNS (s1 string, i1 int, f1 float); + +ALTER TABLE parquet_columnar_renamed SET TBLPROPERTIES ('parquet.columnar.access'='true'); + +SELECT * FROM parquet_columnar_access; +SELECT * FROM parquet_columnar_renamed; +SELECT * FROM parquet_columnar_renamed JOIN parquet_columnar_access ON (s=s1);