diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fafd78e..6123c11 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -759,7 +759,7 @@ "org.apache.hadoop.hive.ql.io.orc.OrcSerde,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe," + "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe,org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe," + "org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe," + - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe", + "org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe", "SerDes retriving schema from metastore. This an internal parameter. Check with the hive dev. team"), HIVEHISTORYFILELOC("hive.querylog.location", diff --git data/files/data.parq data/files/data.parq new file mode 100644 index 0000000..9c46991 Binary files /dev/null and data/files/data.parq differ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ColInfoFromParquetFile.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ColInfoFromParquetFile.java new file mode 100644 index 0000000..8926307 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ColInfoFromParquetFile.java @@ -0,0 +1,219 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.convert; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.DecimalMetadata; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; +import parquet.schema.Type; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ColInfoFromParquetFile { + + private static final Log LOG = LogFactory.getLog(ColInfoFromParquetFile.class); + + /** + * Generates hive schema from a parquet file + * + * @param parquetFile Parquet file from which schema has to be read + * @return Pair containing list of hive column names and ":" separated hive data types + */ + public Pair, String> convert(String parquetFile) { + Configuration conf = new Configuration(); + ParquetMetadata metaData; + try { + metaData = ParquetFileReader.readFooter(conf, new Path(parquetFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + MessageType schema = metaData.getFileMetaData().getSchema(); + + List colNames = new ArrayList(); + StringBuffer colTypes = new StringBuffer(); + String hiveSchema = convert(colNames, colTypes, schema); + + LOG.info("Generated hive schema is " + hiveSchema); + + return new ImmutablePair, String>(colNames, + colTypes.substring(0, colTypes.length() - 1)); + } + + private String convert(List colNames, StringBuffer colTypes, MessageType parquetSchema) { + return convertFields(colNames, colTypes, parquetSchema.getFields()); + } + + private String convertFields(List colNames, StringBuffer colTypes, + List parquetFields) { + StringBuilder hiveSchema = new StringBuilder(); + for (Type parquetType : parquetFields) { + String fieldSchema = convertField(parquetType); + if (parquetType.isRepetition(Type.Repetition.REPEATED)) { + throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. " + + "Type: " + parquetType); + } + + colNames.add(parquetType.getName()); + colTypes.append(fieldSchema + ":"); + + hiveSchema.append(parquetType.getName() + " " + fieldSchema + ", "); + } + + return hiveSchema.substring(0, hiveSchema.length() - 1); + } + + private String convertField(final Type parquetType) { + if (parquetType.isPrimitive()) { + final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName = + parquetType.asPrimitiveType().getPrimitiveTypeName(); + final OriginalType originalType = parquetType.getOriginalType(); + return parquetPrimitiveTypeName.convert( + new PrimitiveType.PrimitiveTypeNameConverter() { + @Override + public String convertBOOLEAN(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "boolean"; + } + + @Override + public String convertINT32(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "int"; + } + + @Override + public String convertINT64(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "bigint"; + } + + @Override + public String convertINT96(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + throw new IllegalArgumentException("INT96 not yet implemented."); + } + + @Override + public String convertFLOAT(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "float"; + } + + @Override + public String convertDOUBLE(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "double"; + } + + @Override + public String convertFIXED_LEN_BYTE_ARRAY(PrimitiveType.PrimitiveTypeName + primitiveTypeName) { + return "binary"; + } + + @Override + public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) { + return "string"; + } else if (originalType == OriginalType.DECIMAL) { + final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType() + .getDecimalMetadata(); + return "decimal(" + decimalMetadata.getPrecision() + "," + + decimalMetadata.getScale() + ")"; + } else { + return "binary"; + } + } + }); + } else { + GroupType parquetGroupType = parquetType.asGroupType(); + OriginalType originalType = parquetGroupType.getOriginalType(); + if (originalType != null) { + switch (originalType) { + case LIST: + if (parquetGroupType.getFieldCount() != 1) { + throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); + } + Type elementType = parquetGroupType.getType(0); + if (!elementType.isRepetition(Type.Repetition.REPEATED)) { + throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); + } + return createHiveArray(elementType); + case MAP: + if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) { + throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); + } + GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType(); + if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) || + !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) || + mapKeyValType.getFieldCount() != 2) { + throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); + } + Type keyType = mapKeyValType.getType(0); + if (!keyType.isPrimitive() || + !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType + .PrimitiveTypeName.BINARY) || + !keyType.getOriginalType().equals(OriginalType.UTF8)) { + throw new IllegalArgumentException("Map key type must be binary (UTF8): " + + keyType); + } + Type valueType = mapKeyValType.getType(1); + return createHiveMap(convertField(keyType), convertField(valueType)); + case ENUM: + return "string"; + case MAP_KEY_VALUE: + case UTF8: + default: + throw new UnsupportedOperationException("Cannot convert Parquet type " + + parquetType); + } + } else { + // if no original type then it's a record + return createHiveStruct(parquetGroupType.getFields()); + } + } + } + + private String createHiveStruct(List parquetFields) { + StringBuilder structString = new StringBuilder(); + + structString.append("struct<"); + boolean needComma = false; + for (Type field: parquetFields) { + if (needComma) { + structString.append(","); + } else { + needComma = true; + } + structString.append(field.getName()+":"+convertField(field)); + } + structString.append(">"); + + return structString.toString(); + } + + private String createHiveMap(String keyType, String valueType) { + return "map<" + keyType + "," + valueType + ">"; + } + + private String createHiveArray(Type elementType) { + return "array<" + convertField(elementType.asGroupType().getFields().get(0)) + ">"; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index 4effe73..34be855 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -13,16 +13,10 @@ */ package org.apache.hadoop.hive.ql.io.parquet.serde; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; - +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.parquet.convert.ColInfoFromParquetFile; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -49,8 +43,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -67,6 +61,14 @@ import parquet.hadoop.ParquetWriter; import parquet.io.api.Binary; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + /** * * A ParquetHiveSerDe for Hive (with the deprecated package mapred) @@ -94,6 +96,8 @@ } } + private final String PARQUET_FILE = "parquet.file"; + private SerDeStats stats; private ObjectInspector objInspector; @@ -121,21 +125,34 @@ public final void initialize(final Configuration conf, final Properties tbl) thr // Get compression properties compressionType = tbl.getProperty(ParquetOutputFormat.COMPRESSION, DEFAULTCOMPRESSION); - if (columnNameProperty.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNameProperty.split(",")); - } - if (columnTypeProperty.length() == 0) { - columnTypes = new ArrayList(); + if (columnNameProperty.length() == 0 && columnTypeProperty.length() == 0) { + final String parquetFile = tbl.getProperty(PARQUET_FILE, null); + if (parquetFile == null) { + throw new RuntimeException("Either provide schema for table or point to parquet file"); + } + + Pair, String> colNameAndTypes = new ColInfoFromParquetFile().convert + (parquetFile); + columnNames = colNameAndTypes.getLeft(); + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(colNameAndTypes.getRight()); } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } } if (columnNames.size() != columnTypes.size()) { throw new IllegalArgumentException("ParquetHiveSerde initialization failed. Number of column " + "name and column type differs. columnNames = " + columnNames + ", columnTypes = " + columnTypes); } + // Create row related objects rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); diff --git ql/src/test/queries/clientpositive/parquet_create_gen_schema.q ql/src/test/queries/clientpositive/parquet_create_gen_schema.q new file mode 100644 index 0000000..4778241 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_create_gen_schema.q @@ -0,0 +1,16 @@ +DROP TABLE parquet_create; + +CREATE TABLE parquet_create +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +TBLPROPERTIES ('parquet.file'='../../data/files/data.parq'); + +DESCRIBE FORMATTED parquet_create; + +LOAD DATA LOCAL INPATH '../../data/files/data.parq' OVERWRITE INTO TABLE parquet_create; + +SELECT * FROM parquet_create; \ No newline at end of file diff --git ql/src/test/queries/clientpositive/parquet_create_gen_schema1.q ql/src/test/queries/clientpositive/parquet_create_gen_schema1.q new file mode 100644 index 0000000..b3572b0 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_create_gen_schema1.q @@ -0,0 +1,16 @@ +DROP TABLE parquet_create; + +CREATE TABLE parquet_create +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +TBLPROPERTIES ('parquet.file'='../../data/files/dec.parq'); + +DESCRIBE FORMATTED parquet_create; + +LOAD DATA LOCAL INPATH '../../data/files/dec.parq' OVERWRITE INTO TABLE parquet_create; + +SELECT * FROM parquet_create; \ No newline at end of file diff --git ql/src/test/results/clientpositive/parquet_create_gen_schema.q.out ql/src/test/results/clientpositive/parquet_create_gen_schema.q.out new file mode 100644 index 0000000..299f3ea --- /dev/null +++ ql/src/test/results/clientpositive/parquet_create_gen_schema.q.out @@ -0,0 +1,84 @@ +PREHOOK: query: DROP TABLE parquet_create +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_create +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE parquet_create +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +TBLPROPERTIES ('parquet.file'='../../data/files/data.parq') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_create +POSTHOOK: query: CREATE TABLE parquet_create +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +TBLPROPERTIES ('parquet.file'='../../data/files/data.parq') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_create +PREHOOK: query: DESCRIBE FORMATTED parquet_create +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@parquet_create +POSTHOOK: query: DESCRIBE FORMATTED parquet_create +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@parquet_create +# col_name data_type comment + +bo1 boolean +ti1 int +si1 int +i1 int +bi1 bigint +f1 float +d1 double +s1 string +m1 map +l1 array +st1 struct + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + parquet.file ../../data/files/data.parq +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/data.parq' OVERWRITE INTO TABLE parquet_create +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@parquet_create +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/data.parq' OVERWRITE INTO TABLE parquet_create +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@parquet_create +PREHOOK: query: SELECT * FROM parquet_create +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_create +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_create +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_create +#### A masked pattern was here #### +true 10 100 1000 10000 4.0 20.0 hello {"k1":"v1"} [100,200] {"c1":10,"c2":"foo"} diff --git ql/src/test/results/clientpositive/parquet_create_gen_schema1.q.out ql/src/test/results/clientpositive/parquet_create_gen_schema1.q.out new file mode 100644 index 0000000..5ed9acc --- /dev/null +++ ql/src/test/results/clientpositive/parquet_create_gen_schema1.q.out @@ -0,0 +1,84 @@ +PREHOOK: query: DROP TABLE parquet_create +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_create +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE parquet_create +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +TBLPROPERTIES ('parquet.file'='../../data/files/dec.parq') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_create +POSTHOOK: query: CREATE TABLE parquet_create +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +TBLPROPERTIES ('parquet.file'='../../data/files/dec.parq') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_create +PREHOOK: query: DESCRIBE FORMATTED parquet_create +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@parquet_create +POSTHOOK: query: DESCRIBE FORMATTED parquet_create +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@parquet_create +# col_name data_type comment + +name string +value decimal(5,2) + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + parquet.file ../../data/files/dec.parq +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/dec.parq' OVERWRITE INTO TABLE parquet_create +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@parquet_create +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/dec.parq' OVERWRITE INTO TABLE parquet_create +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@parquet_create +PREHOOK: query: SELECT * FROM parquet_create +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_create +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_create +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_create +#### A masked pattern was here #### +Tom 234.79 +Beck 77.34 +Snow 55.71 +Mary 4.33 +Cluck 5.96 +Tom 12.25 +Mary 33.33 +Tom 0.19 +Beck 3.15 +Beck 7.99