diff --git data/files/parquet_type_promotion.txt data/files/parquet_type_promotion.txt new file mode 100644 index 0000000..dc3e13e --- /dev/null +++ data/files/parquet_type_promotion.txt @@ -0,0 +1,4 @@ +100|5643|0.3|0.7|k1:11|7,17,22|10,20|k11:4.0|2.3,3.0,5.5|5.7,4.8 +200|5643|0.4|0.8|k2:14|8,17,24|20,20|v11:5.0|3.3,3.1,5.6|5.8,4.7 +300|7643|0.4|0.9|k3:12|9,17,25|30,60|b11:6.0|4.3,3.2,5.7|5.9,4.6 +400|8643|0.4|0.5|k4:15|7,18,27|50,70|d11:8.0|6.3,3.3,5.8|5.0,4.5 diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java index 3261e4b..1bbdcec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; @@ -31,9 +32,10 @@ private final HiveStructConverter root; - public DataWritableRecordConverter(final GroupType requestedSchema, final Map metadata) { + public DataWritableRecordConverter(final GroupType requestedSchema, final Map metadata, TypeInfo hiveTypeInfo) { this.root = new HiveStructConverter(requestedSchema, - MessageTypeParser.parseMessageType(metadata.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)), metadata); + MessageTypeParser.parseMessageType(metadata.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)), + metadata, hiveTypeInfo); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index 17849fa..ec0dd81 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; @@ -49,7 +51,7 @@ EDOUBLE_CONVERTER(Double.TYPE) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new PrimitiveConverter() { @Override public void addDouble(final double value) { @@ -60,7 +62,7 @@ public void addDouble(final double value) { }, EBOOLEAN_CONVERTER(Boolean.TYPE) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new PrimitiveConverter() { @Override public void addBoolean(final boolean value) { @@ -71,29 +73,47 @@ public void addBoolean(final boolean value) { }, EFLOAT_CONVERTER(Float.TYPE) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { - return new PrimitiveConverter() { - @Override - public void addFloat(final float value) { - parent.set(index, new FloatWritable(value)); - } - }; + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { + if (hiveTypeInfo != null && hiveTypeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + parent.set(index, new DoubleWritable((double) value)); + } + }; + } else { + return new PrimitiveConverter() { + @Override + public void addFloat(final float value) { + parent.set(index, new FloatWritable(value)); + } + }; + } } }, EINT32_CONVERTER(Integer.TYPE) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { - return new PrimitiveConverter() { - @Override - public void addInt(final int value) { - parent.set(index, new IntWritable(value)); - } - }; + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { + if (hiveTypeInfo != null && hiveTypeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return new PrimitiveConverter() { + @Override + public void addInt(final int value) { + parent.set(index, new LongWritable((long)value)); + } + }; + } else { + return new PrimitiveConverter() { + @Override + public void addInt(final int value) { + parent.set(index, new IntWritable(value)); + } + }; + } } }, EINT64_CONVERTER(Long.TYPE) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new PrimitiveConverter() { @Override public void addLong(final long value) { @@ -104,7 +124,7 @@ public void addLong(final long value) { }, EBINARY_CONVERTER(Binary.class) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new BinaryConverter(type, parent, index) { @Override protected BytesWritable convert(Binary binary) { @@ -115,7 +135,7 @@ protected BytesWritable convert(Binary binary) { }, ESTRING_CONVERTER(String.class) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new BinaryConverter(type, parent, index) { @Override protected Text convert(Binary binary) { @@ -126,7 +146,7 @@ protected Text convert(Binary binary) { }, EDECIMAL_CONVERTER(BigDecimal.class) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new BinaryConverter(type, parent, index) { @Override protected HiveDecimalWritable convert(Binary binary) { @@ -137,7 +157,7 @@ protected HiveDecimalWritable convert(Binary binary) { }, ETIMESTAMP_CONVERTER(TimestampWritable.class) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new BinaryConverter(type, parent, index) { @Override protected TimestampWritable convert(Binary binary) { @@ -154,7 +174,7 @@ protected TimestampWritable convert(Binary binary) { }, EDATE_CONVERTER(DateWritable.class) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { return new PrimitiveConverter() { @Override public void addInt(final int value) { @@ -174,26 +194,26 @@ private ETypeConverter(final Class type) { return _type; } - abstract PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent); + abstract PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo); public static PrimitiveConverter getNewConverter(final PrimitiveType type, final int index, - final ConverterParent parent) { + final ConverterParent parent, TypeInfo hiveTypeInfo) { if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) { //TODO- cleanup once parquet support Timestamp type annotation. - return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent); + return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); } if (OriginalType.DECIMAL == type.getOriginalType()) { - return EDECIMAL_CONVERTER.getConverter(type, index, parent); + return EDECIMAL_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); } else if (OriginalType.UTF8 == type.getOriginalType()) { - return ESTRING_CONVERTER.getConverter(type, index, parent); + return ESTRING_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); } else if (OriginalType.DATE == type.getOriginalType()) { - return EDATE_CONVERTER.getConverter(type, index, parent); + return EDATE_CONVERTER.getConverter(type, index, parent, hiveTypeInfo); } Class javaType = type.getPrimitiveTypeName().javaType; for (final ETypeConverter eConverter : values()) { if (eConverter.getType() == javaType) { - return eConverter.getConverter(type, index, parent); + return eConverter.getConverter(type, index, parent, hiveTypeInfo); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java index 06f3d32..5fb58c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java @@ -22,6 +22,9 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.parquet.io.api.Converter; @@ -37,21 +40,21 @@ public static HiveGroupConverter forMap(GroupType mapType, ConverterParent parent, - int index) { + int index, TypeInfo hiveTypeInfo) { return new HiveCollectionConverter( - mapType, parent, index, true /* its a map */ ); + mapType, parent, index, true /* its a map */, hiveTypeInfo ); } public static HiveGroupConverter forList(GroupType listType, ConverterParent parent, - int index) { + int index, TypeInfo hiveTypeInfo) { return new HiveCollectionConverter( - listType, parent, index, false /* not a map */); + listType, parent, index, false /* nUnknown hive type infoot a map */, hiveTypeInfo); } private HiveCollectionConverter(GroupType collectionType, ConverterParent parent, - int index, boolean isMap) { + int index, boolean isMap, TypeInfo hiveTypeInfo) { setMetadata(parent.getMetadata()); this.collectionType = collectionType; this.parent = parent; @@ -59,12 +62,21 @@ private HiveCollectionConverter(GroupType collectionType, Type repeatedType = collectionType.getType(0); if (isMap) { this.innerConverter = new KeyValueConverter( - repeatedType.asGroupType(), this); + repeatedType.asGroupType(), this, hiveTypeInfo); } else if (isElementType(repeatedType, collectionType.getName())) { - this.innerConverter = getConverterFromDescription(repeatedType, 0, this); + this.innerConverter = getConverterFromDescription(repeatedType, 0, this, extractListCompatibleType(hiveTypeInfo)); } else { this.innerConverter = new ElementConverter( - repeatedType.asGroupType(), this); + repeatedType.asGroupType(), this, extractListCompatibleType(hiveTypeInfo)); + } + } + + private TypeInfo extractListCompatibleType(TypeInfo hiveTypeInfo) { + if (hiveTypeInfo != null && hiveTypeInfo instanceof ListTypeInfo) { + return ((ListTypeInfo) hiveTypeInfo).getListElementTypeInfo(); + } else { + return hiveTypeInfo; //to handle map can read list of struct data (i.e. list> --> map) } } @@ -97,13 +109,13 @@ public void set(int index, Writable value) { private final Converter valueConverter; private Writable[] keyValue = null; - public KeyValueConverter(GroupType keyValueType, HiveGroupConverter parent) { + public KeyValueConverter(GroupType keyValueType, HiveGroupConverter parent, TypeInfo hiveTypeInfo) { setMetadata(parent.getMetadata()); this.parent = parent; this.keyConverter = getConverterFromDescription( - keyValueType.getType(0), 0, this); + keyValueType.getType(0), 0, this, hiveTypeInfo == null ? null : ((MapTypeInfo) hiveTypeInfo).getMapKeyTypeInfo()); this.valueConverter = getConverterFromDescription( - keyValueType.getType(1), 1, this); + keyValueType.getType(1), 1, this, hiveTypeInfo == null ? null : ((MapTypeInfo) hiveTypeInfo).getMapValueTypeInfo()); } @Override @@ -140,11 +152,11 @@ public void end() { private final Converter elementConverter; private Writable element = null; - public ElementConverter(GroupType repeatedType, HiveGroupConverter parent) { + public ElementConverter(GroupType repeatedType, HiveGroupConverter parent, TypeInfo hiveTypeInfo) { setMetadata(parent.getMetadata()); this.parent = parent; this.elementConverter = getConverterFromDescription( - repeatedType.getType(0), 0, this); + repeatedType.getType(0), 0, this, hiveTypeInfo); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java index b1ca85a..e732f4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Writable; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -36,39 +37,41 @@ public void setMetadata(Map metadata) { return metadata; } - protected static PrimitiveConverter getConverterFromDescription(PrimitiveType type, int index, ConverterParent parent) { + protected static PrimitiveConverter getConverterFromDescription(PrimitiveType type, int index, ConverterParent + parent, TypeInfo hiveTypeInfo) { if (type == null) { return null; } - return ETypeConverter.getNewConverter(type, index, parent); + return ETypeConverter.getNewConverter(type, index, parent, hiveTypeInfo); } - protected static HiveGroupConverter getConverterFromDescription(GroupType type, int index, ConverterParent parent) { + protected static HiveGroupConverter getConverterFromDescription(GroupType type, int index, ConverterParent parent, + TypeInfo hiveTypeInfo) { if (type == null) { return null; } OriginalType annotation = type.getOriginalType(); if (annotation == OriginalType.LIST) { - return HiveCollectionConverter.forList(type, parent, index); + return HiveCollectionConverter.forList(type, parent, index, hiveTypeInfo); } else if (annotation == OriginalType.MAP || annotation == OriginalType.MAP_KEY_VALUE) { - return HiveCollectionConverter.forMap(type, parent, index); + return HiveCollectionConverter.forMap(type, parent, index, hiveTypeInfo); } - return new HiveStructConverter(type, parent, index); + return new HiveStructConverter(type, parent, index, hiveTypeInfo); } - protected static Converter getConverterFromDescription(Type type, int index, ConverterParent parent) { + protected static Converter getConverterFromDescription(Type type, int index, ConverterParent parent, TypeInfo hiveTypeInfo) { if (type == null) { return null; } if (type.isPrimitive()) { - return getConverterFromDescription(type.asPrimitiveType(), index, parent); + return getConverterFromDescription(type.asPrimitiveType(), index, parent, hiveTypeInfo); } - return getConverterFromDescription(type.asGroupType(), index, parent); + return getConverterFromDescription(type.asGroupType(), index, parent, hiveTypeInfo); } public abstract void set(int index, Writable value); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java index 9c35a9f..1b43dd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java @@ -17,6 +17,9 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.*; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.parquet.io.api.Converter; @@ -31,44 +34,61 @@ public class HiveStructConverter extends HiveGroupConverter { private final int totalFieldCount; - private final Converter[] converters; + private Converter[] converters; private final ConverterParent parent; private final int index; private Writable[] writables; - private final List repeatedConverters; + private List repeatedConverters; private boolean reuseWritableArray = false; + private List hiveFieldNames; + private List hiveFieldTypeInfos; - public HiveStructConverter(final GroupType requestedSchema, final GroupType tableSchema, Map metadata) { - this(requestedSchema, null, 0, tableSchema); + public HiveStructConverter(final GroupType requestedSchema, final GroupType tableSchema, + Map metadata, TypeInfo hiveTypeInfo) { setMetadata(metadata); this.reuseWritableArray = true; this.writables = new Writable[tableSchema.getFieldCount()]; + this.parent = null; + this.index = 0; + this.totalFieldCount = tableSchema.getFieldCount(); + init(requestedSchema, null, 0, tableSchema, hiveTypeInfo); } public HiveStructConverter(final GroupType groupType, final ConverterParent parent, - final int index) { - this(groupType, parent, index, groupType); + final int index, TypeInfo hiveTypeInfo) { + this(groupType, parent, index, groupType, hiveTypeInfo); } public HiveStructConverter(final GroupType selectedGroupType, - final ConverterParent parent, final int index, final GroupType containingGroupType) { - if (parent != null) { - setMetadata(parent.getMetadata()); - } + final ConverterParent parent, final int index, final GroupType containingGroupType, TypeInfo hiveTypeInfo) { this.parent = parent; this.index = index; this.totalFieldCount = containingGroupType.getFieldCount(); + init(selectedGroupType, parent, index, containingGroupType, hiveTypeInfo); + } + + private void init(final GroupType selectedGroupType, + final ConverterParent parent, final int index, final GroupType containingGroupType, TypeInfo hiveTypeInfo) { + if (parent != null) { + setMetadata(parent.getMetadata()); + } final int selectedFieldCount = selectedGroupType.getFieldCount(); converters = new Converter[selectedFieldCount]; this.repeatedConverters = new ArrayList(); + if (hiveTypeInfo != null && hiveTypeInfo.getCategory().equals(ObjectInspector.Category.STRUCT)) { + this.hiveFieldNames = ((StructTypeInfo) hiveTypeInfo).getAllStructFieldNames(); + this.hiveFieldTypeInfos = ((StructTypeInfo) hiveTypeInfo).getAllStructFieldTypeInfos(); + } + List selectedFields = selectedGroupType.getFields(); for (int i = 0; i < selectedFieldCount; i++) { Type subtype = selectedFields.get(i); if (containingGroupType.getFields().contains(subtype)) { int fieldIndex = containingGroupType.getFieldIndex(subtype.getName()); - converters[i] = getFieldConverter(subtype, fieldIndex); + TypeInfo _hiveTypeInfo = getFieldTypeIgnoreCase(hiveTypeInfo, subtype.getName(), fieldIndex); + converters[i] = getFieldConverter(subtype, fieldIndex, _hiveTypeInfo); } else { throw new IllegalStateException("Group type [" + containingGroupType + "] does not contain requested field: " + subtype); @@ -76,20 +96,56 @@ public HiveStructConverter(final GroupType selectedGroupType, } } - private Converter getFieldConverter(Type type, int fieldIndex) { + private TypeInfo getFieldTypeIgnoreCase(TypeInfo hiveTypeInfo, String fieldName, int fieldIndex) { + if (hiveTypeInfo == null) { + return null; + } else if (hiveTypeInfo.getCategory().equals(ObjectInspector.Category.STRUCT)) { + return getStructFieldTypeInfo(fieldName, fieldIndex); + } else if (hiveTypeInfo.getCategory().equals(ObjectInspector.Category.MAP)) { + //This cover the case where hive table may have map but the data file is + // of type array> + //Using index in place of type name. + if (fieldIndex == 0) { + return ((MapTypeInfo) hiveTypeInfo).getMapKeyTypeInfo(); + } else if (fieldIndex == 1) { + return ((MapTypeInfo) hiveTypeInfo).getMapValueTypeInfo(); + } else {//Other fields are skipped for this case + return null; + } + } + throw new RuntimeException("Unknown hive type info " + hiveTypeInfo + " when searching for field " + fieldName); + } + + private TypeInfo getStructFieldTypeInfo(String field, int fieldIndex) { + String fieldLowerCase = field.toLowerCase(); + if (Boolean.valueOf(getMetadata().get(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS)) + && fieldIndex < hiveFieldNames.size()) { + return hiveFieldTypeInfos.get(fieldIndex); + } + for (int i = 0; i < hiveFieldNames.size(); i++) { + if (fieldLowerCase.equalsIgnoreCase(hiveFieldNames.get(i))) { + return hiveFieldTypeInfos.get(i); + } + } + throw new RuntimeException("cannot find field " + field + + " in " + hiveFieldNames); + } + + private Converter getFieldConverter(Type type, int fieldIndex, TypeInfo hiveTypeInfo) { Converter converter; if (type.isRepetition(Type.Repetition.REPEATED)) { if (type.isPrimitive()) { converter = new Repeated.RepeatedPrimitiveConverter( - type.asPrimitiveType(), this, fieldIndex); + type.asPrimitiveType(), this, fieldIndex, hiveTypeInfo); } else { converter = new Repeated.RepeatedGroupConverter( - type.asGroupType(), this, fieldIndex); + type.asGroupType(), this, fieldIndex, hiveTypeInfo == null ? null : ((ListTypeInfo) hiveTypeInfo) + .getListElementTypeInfo()); } repeatedConverters.add((Repeated) converter); } else { - converter = getConverterFromDescription(type, fieldIndex, this); + converter = getConverterFromDescription(type, fieldIndex, this, hiveTypeInfo); } return converter; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java index c0af291..a7fad71 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.parquet.column.Dictionary; @@ -65,12 +66,12 @@ public void setMetadata(Map metadata) { private final int index; private final List list = new ArrayList(); - public RepeatedPrimitiveConverter(PrimitiveType primitiveType, ConverterParent parent, int index) { + public RepeatedPrimitiveConverter(PrimitiveType primitiveType, ConverterParent parent, int index, TypeInfo hiveTypeInfo) { setMetadata(parent.getMetadata()); this.primitiveType = primitiveType; this.parent = parent; this.index = index; - this.wrapped = HiveGroupConverter.getConverterFromDescription(primitiveType, 0, this); + this.wrapped = HiveGroupConverter.getConverterFromDescription(primitiveType, 0, this, hiveTypeInfo); } @Override @@ -149,12 +150,12 @@ public void set(int index, Writable value) { private final Map metadata = new HashMap(); - public RepeatedGroupConverter(GroupType groupType, ConverterParent parent, int index) { + public RepeatedGroupConverter(GroupType groupType, ConverterParent parent, int index, TypeInfo hiveTypeInfo) { setMetadata(parent.getMetadata()); this.groupType = groupType; this.parent = parent; this.index = index; - this.wrapped = HiveGroupConverter.getConverterFromDescription(groupType, 0, this); + this.wrapped = HiveGroupConverter.getConverterFromDescription(groupType, 0, this, hiveTypeInfo); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 97f228f..53f3b72 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.util.StringUtils; @@ -52,7 +53,7 @@ public static final String HIVE_TABLE_AS_PARQUET_SCHEMA = "HIVE_TABLE_SCHEMA"; public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; - + private TypeInfo hiveTypeInfo; /** * From a string which columns names (including hive column), return a list * of string columns @@ -204,6 +205,8 @@ private static MessageType getSchemaByIndex(MessageType schema, List col if (columnNames != null) { List columnNamesList = getColumnNames(columnNames); + String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); + List columnTypesList = getColumnTypes(columnTypes); MessageType tableSchema; if (indexAccess) { @@ -216,13 +219,13 @@ private static MessageType getSchemaByIndex(MessageType schema, List col tableSchema = getSchemaByIndex(fileSchema, columnNamesList, indexSequence); } else { - String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); - List columnTypesList = getColumnTypes(columnTypes); tableSchema = getSchemaByName(fileSchema, columnNamesList, columnTypesList); } contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, tableSchema.toString()); + contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess)); + this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { @@ -262,6 +265,6 @@ private static MessageType getSchemaByIndex(MessageType schema, List col metadata.put(key, String.valueOf(HiveConf.getBoolVar( configuration, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION))); } - return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata); + return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo); } } diff --git ql/src/test/queries/clientpositive/parquet_columnar.q ql/src/test/queries/clientpositive/parquet_columnar.q index b5aca1b..d6daeec 100644 --- ql/src/test/queries/clientpositive/parquet_columnar.q +++ ql/src/test/queries/clientpositive/parquet_columnar.q @@ -28,3 +28,7 @@ SELECT * FROM parquet_columnar_access; ALTER TABLE parquet_columnar_access REPLACE COLUMNS (s1 string, x1 int, y1 int, f1 float); SELECT * FROM parquet_columnar_access; + +ALTER TABLE parquet_columnar_access REPLACE COLUMNS (s1 string, x1 bigint, y1 int, f1 double); + +SELECT * FROM parquet_columnar_access; diff --git ql/src/test/queries/clientpositive/parquet_type_promotion.q ql/src/test/queries/clientpositive/parquet_type_promotion.q new file mode 100644 index 0000000..f9a8562 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_type_promotion.q @@ -0,0 +1,70 @@ +DROP TABLE parquet_type_promotion_staging; +DROP TABLE parquet_type_promotion; + +CREATE TABLE parquet_type_promotion_staging ( + cint int, + clong bigint, + cfloat float, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':'; + +LOAD DATA LOCAL INPATH '../../data/files/parquet_type_promotion.txt' OVERWRITE INTO TABLE parquet_type_promotion_staging; + +SELECT * FROM parquet_type_promotion_staging; + +CREATE TABLE parquet_type_promotion ( + cint int, + clong bigint, + cfloat float, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct + +) STORED AS PARQUET; + +INSERT OVERWRITE TABLE parquet_type_promotion + SELECT * FROM parquet_type_promotion_staging; + +SELECT * FROM parquet_type_promotion; + +ALTER TABLE parquet_type_promotion REPLACE COLUMNS( + cint bigint, + clong bigint, + cfloat double, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct +); + +SELECT * FROM parquet_type_promotion; + +-- This test covers the case where array> data +-- can be retrieved useing map. +-- This also test if there are more than 2 fields in array_of_struct +-- which can be read as map as well. + +DROP TABLE arrays_of_struct_to_map; +CREATE TABLE arrays_of_struct_to_map (locations1 array>, locations2 array>) STORED AS PARQUET; +INSERT INTO TABLE arrays_of_struct_to_map select array(named_struct("c1",1,"c2",2)), array(named_struct("f1", +77,"f2",88,"f3",99)) FROM parquet_type_promotion LIMIT 1; +SELECT * FROM arrays_of_struct_to_map; +ALTER TABLE arrays_of_struct_to_map REPLACE COLUMNS (locations1 map, locations2 map); +SELECT * FROM arrays_of_struct_to_map; diff --git ql/src/test/results/clientpositive/parquet_columnar.q.out ql/src/test/results/clientpositive/parquet_columnar.q.out index 934333b..fd28f5c 100644 --- ql/src/test/results/clientpositive/parquet_columnar.q.out +++ ql/src/test/results/clientpositive/parquet_columnar.q.out @@ -137,3 +137,40 @@ POSTHOOK: Input: default@parquet_columnar_access 1cde18 1 2 1.3 1fgh19 2 3 1.4 1ijk20 3 4 1.0 +PREHOOK: query: ALTER TABLE parquet_columnar_access REPLACE COLUMNS (s1 string, x1 bigint, y1 int, f1 double) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@parquet_columnar_access +PREHOOK: Output: default@parquet_columnar_access +POSTHOOK: query: ALTER TABLE parquet_columnar_access REPLACE COLUMNS (s1 string, x1 bigint, y1 int, f1 double) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@parquet_columnar_access +POSTHOOK: Output: default@parquet_columnar_access +PREHOOK: query: SELECT * FROM parquet_columnar_access +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_columnar_access +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_columnar_access +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_columnar_access +#### A masked pattern was here #### +1abc00 1 2 1.0 +1def01 2 3 1.100000023841858 +1ghi02 3 4 1.2000000476837158 +1jkl03 1 2 1.2999999523162842 +1mno04 2 3 1.399999976158142 +1pqr05 3 4 1.0 +1stu06 1 2 1.100000023841858 +1vwx07 2 3 1.2000000476837158 +1yza08 3 4 1.2999999523162842 +1bcd09 1 2 1.399999976158142 +1efg10 2 3 1.0 +1hij11 3 4 1.100000023841858 +1klm12 1 2 1.2000000476837158 +1nop13 2 3 1.2999999523162842 +1qrs14 3 4 1.399999976158142 +1tuv15 1 2 1.0 +1wxy16 2 3 1.100000023841858 +1zab17 3 4 1.2000000476837158 +1cde18 1 2 1.2999999523162842 +1fgh19 2 3 1.399999976158142 +1ijk20 3 4 1.0 diff --git ql/src/test/results/clientpositive/parquet_type_promotion.q.out ql/src/test/results/clientpositive/parquet_type_promotion.q.out new file mode 100644 index 0000000..15a29e3 --- /dev/null +++ ql/src/test/results/clientpositive/parquet_type_promotion.q.out @@ -0,0 +1,232 @@ +PREHOOK: query: DROP TABLE parquet_type_promotion_staging +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_type_promotion_staging +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE parquet_type_promotion +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_type_promotion +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE parquet_type_promotion_staging ( + cint int, + clong bigint, + cfloat float, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_type_promotion_staging +POSTHOOK: query: CREATE TABLE parquet_type_promotion_staging ( + cint int, + clong bigint, + cfloat float, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_type_promotion_staging +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_type_promotion.txt' OVERWRITE INTO TABLE parquet_type_promotion_staging +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@parquet_type_promotion_staging +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_type_promotion.txt' OVERWRITE INTO TABLE parquet_type_promotion_staging +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@parquet_type_promotion_staging +PREHOOK: query: SELECT * FROM parquet_type_promotion_staging +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_type_promotion_staging +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_type_promotion_staging +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_type_promotion_staging +#### A masked pattern was here #### +100 5643 0.3 0.7 {"k1":11} [7,17,22] {"c1":10,"c2":20} {"k11":4.0} [2.3,3.0,5.5] {"c1":5.7,"c2":4.8} +200 5643 0.4 0.8 {"k2":14} [8,17,24] {"c1":20,"c2":20} {"v11":5.0} [3.3,3.1,5.6] {"c1":5.8,"c2":4.7} +300 7643 0.4 0.9 {"k3":12} [9,17,25] {"c1":30,"c2":60} {"b11":6.0} [4.3,3.2,5.7] {"c1":5.9,"c2":4.6} +400 8643 0.4 0.5 {"k4":15} [7,18,27] {"c1":50,"c2":70} {"d11":8.0} [6.3,3.3,5.8] {"c1":5.0,"c2":4.5} +PREHOOK: query: CREATE TABLE parquet_type_promotion ( + cint int, + clong bigint, + cfloat float, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct + +) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_type_promotion +POSTHOOK: query: CREATE TABLE parquet_type_promotion ( + cint int, + clong bigint, + cfloat float, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct + +) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_type_promotion +PREHOOK: query: INSERT OVERWRITE TABLE parquet_type_promotion + SELECT * FROM parquet_type_promotion_staging +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_type_promotion_staging +PREHOOK: Output: default@parquet_type_promotion +POSTHOOK: query: INSERT OVERWRITE TABLE parquet_type_promotion + SELECT * FROM parquet_type_promotion_staging +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_type_promotion_staging +POSTHOOK: Output: default@parquet_type_promotion +POSTHOOK: Lineage: parquet_type_promotion.cdouble SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.cfloat SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.cint SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.clong SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:clong, type:bigint, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.fl1 SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:fl1, type:array, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.fm1 SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:fm1, type:map, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.fst1 SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:fst1, type:struct, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.l1 SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:l1, type:array, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.m1 SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:m1, type:map, comment:null), ] +POSTHOOK: Lineage: parquet_type_promotion.st1 SIMPLE [(parquet_type_promotion_staging)parquet_type_promotion_staging.FieldSchema(name:st1, type:struct, comment:null), ] +PREHOOK: query: SELECT * FROM parquet_type_promotion +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_type_promotion +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_type_promotion +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_type_promotion +#### A masked pattern was here #### +100 5643 0.3 0.7 {"k1":11} [7,17,22] {"c1":10,"c2":20} {"k11":4.0} [2.3,3.0,5.5] {"c1":5.7,"c2":4.8} +200 5643 0.4 0.8 {"k2":14} [8,17,24] {"c1":20,"c2":20} {"v11":5.0} [3.3,3.1,5.6] {"c1":5.8,"c2":4.7} +300 7643 0.4 0.9 {"k3":12} [9,17,25] {"c1":30,"c2":60} {"b11":6.0} [4.3,3.2,5.7] {"c1":5.9,"c2":4.6} +400 8643 0.4 0.5 {"k4":15} [7,18,27] {"c1":50,"c2":70} {"d11":8.0} [6.3,3.3,5.8] {"c1":5.0,"c2":4.5} +PREHOOK: query: ALTER TABLE parquet_type_promotion REPLACE COLUMNS( + cint bigint, + clong bigint, + cfloat double, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct +) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@parquet_type_promotion +PREHOOK: Output: default@parquet_type_promotion +POSTHOOK: query: ALTER TABLE parquet_type_promotion REPLACE COLUMNS( + cint bigint, + clong bigint, + cfloat double, + cdouble double, + m1 map, + l1 array, + st1 struct, + fm1 map, + fl1 array, + fst1 struct +) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@parquet_type_promotion +POSTHOOK: Output: default@parquet_type_promotion +PREHOOK: query: SELECT * FROM parquet_type_promotion +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_type_promotion +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_type_promotion +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_type_promotion +#### A masked pattern was here #### +100 5643 0.30000001192092896 0.7 {"k1":11} [7,17,22] {"c1":10,"c2":20} {"k11":4.0} [2.299999952316284,3.0,5.5] {"c1":5.699999809265137,"c2":4.8} +200 5643 0.4000000059604645 0.8 {"k2":14} [8,17,24] {"c1":20,"c2":20} {"v11":5.0} [3.299999952316284,3.0999999046325684,5.599999904632568] {"c1":5.800000190734863,"c2":4.7} +300 7643 0.4000000059604645 0.9 {"k3":12} [9,17,25] {"c1":30,"c2":60} {"b11":6.0} [4.300000190734863,3.200000047683716,5.699999809265137] {"c1":5.900000095367432,"c2":4.6} +400 8643 0.4000000059604645 0.5 {"k4":15} [7,18,27] {"c1":50,"c2":70} {"d11":8.0} [6.300000190734863,3.299999952316284,5.800000190734863] {"c1":5.0,"c2":4.5} +PREHOOK: query: -- This test covers the case where array> data +-- can be retrieved useing map. +-- This also test if there are more than 2 fields in array_of_struct +-- which can be read as map as well. + +DROP TABLE arrays_of_struct_to_map +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- This test covers the case where array> data +-- can be retrieved useing map. +-- This also test if there are more than 2 fields in array_of_struct +-- which can be read as map as well. + +DROP TABLE arrays_of_struct_to_map +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE arrays_of_struct_to_map (locations1 array>, locations2 array>) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@arrays_of_struct_to_map +POSTHOOK: query: CREATE TABLE arrays_of_struct_to_map (locations1 array>, locations2 array>) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@arrays_of_struct_to_map +PREHOOK: query: INSERT INTO TABLE arrays_of_struct_to_map select array(named_struct("c1",1,"c2",2)), array(named_struct("f1", +77,"f2",88,"f3",99)) FROM parquet_type_promotion LIMIT 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_type_promotion +PREHOOK: Output: default@arrays_of_struct_to_map +POSTHOOK: query: INSERT INTO TABLE arrays_of_struct_to_map select array(named_struct("c1",1,"c2",2)), array(named_struct("f1", +77,"f2",88,"f3",99)) FROM parquet_type_promotion LIMIT 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_type_promotion +POSTHOOK: Output: default@arrays_of_struct_to_map +POSTHOOK: Lineage: arrays_of_struct_to_map.locations1 EXPRESSION [] +POSTHOOK: Lineage: arrays_of_struct_to_map.locations2 EXPRESSION [] +PREHOOK: query: SELECT * FROM arrays_of_struct_to_map +PREHOOK: type: QUERY +PREHOOK: Input: default@arrays_of_struct_to_map +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM arrays_of_struct_to_map +POSTHOOK: type: QUERY +POSTHOOK: Input: default@arrays_of_struct_to_map +#### A masked pattern was here #### +[{"c1":1,"c2":2}] [{"f1":77,"f2":88,"f3":99}] +PREHOOK: query: ALTER TABLE arrays_of_struct_to_map REPLACE COLUMNS (locations1 map, locations2 map) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@arrays_of_struct_to_map +PREHOOK: Output: default@arrays_of_struct_to_map +POSTHOOK: query: ALTER TABLE arrays_of_struct_to_map REPLACE COLUMNS (locations1 map, locations2 map) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@arrays_of_struct_to_map +POSTHOOK: Output: default@arrays_of_struct_to_map +PREHOOK: query: SELECT * FROM arrays_of_struct_to_map +PREHOOK: type: QUERY +PREHOOK: Input: default@arrays_of_struct_to_map +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM arrays_of_struct_to_map +POSTHOOK: type: QUERY +POSTHOOK: Input: default@arrays_of_struct_to_map +#### A masked pattern was here #### +{1:2} {77:88}