From baf75da607d37494309ee2a13347fe16f310f1aa Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 18 Nov 2014 16:44:20 -0800 Subject: [PATCH] HIVE-8909: Implement Parquet backward-compat rules. This implements the rules specified by PARQUET-113 for reading existing list and map structures in different Parquet object models. This adds a HiveCollectionConverter that implements the rules for LIST and MAP annotated types. It also adds support for un-annotated repeated groups according to the spec. Existing data is produced and validated in TestArrayCompatibility. --- .../convert/ArrayWritableGroupConverter.java | 86 --- .../ql/io/parquet/convert/ConverterParent.java | 7 + .../convert/DataWritableGroupConverter.java | 140 ---- .../convert/DataWritableRecordConverter.java | 10 +- .../hive/ql/io/parquet/convert/ETypeConverter.java | 28 +- .../parquet/convert/HiveCollectionConverter.java | 164 +++++ .../ql/io/parquet/convert/HiveGroupConverter.java | 59 +- .../ql/io/parquet/convert/HiveStructConverter.java | 129 ++++ .../hive/ql/io/parquet/convert/Repeated.java | 155 +++++ .../hive/ql/io/parquet/TestArrayCompatibility.java | 758 +++++++++++++++++++++ 10 files changed, 1274 insertions(+), 262 deletions(-) delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java deleted file mode 100644 index 052b36d..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.parquet.convert; - -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; - -import parquet.io.ParquetDecodingException; -import parquet.io.api.Converter; -import parquet.schema.GroupType; - -public class ArrayWritableGroupConverter extends HiveGroupConverter { - - private final Converter[] converters; - private final HiveGroupConverter parent; - private final int index; - private final boolean isMap; - private Writable currentValue; - private Writable[] mapPairContainer; - - public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, - final int index) { - this.parent = parent; - this.index = index; - int count = groupType.getFieldCount(); - if (count < 1 || count > 2) { - throw new IllegalStateException("Field count must be either 1 or 2: " + count); - } - isMap = count == 2; - converters = new Converter[count]; - for (int i = 0; i < count; i++) { - converters[i] = getConverterFromDescription(groupType.getType(i), i, this); - } - } - - @Override - public Converter getConverter(final int fieldIndex) { - return converters[fieldIndex]; - } - - @Override - public void start() { - if (isMap) { - mapPairContainer = new Writable[2]; - } - currentValue = null; - } - - @Override - public void end() { - if (isMap) { - currentValue = new ArrayWritable(Writable.class, mapPairContainer); - } - parent.add(index, currentValue); - } - - @Override - protected void set(final int index, final Writable value) { - if (index != 0 && mapPairContainer == null || index > 1) { - throw new ParquetDecodingException("Repeated group can only have one or two fields for maps." + - " Not allowed to set for the index : " + index); - } - - if (isMap) { - mapPairContainer[index] = value; - } else { - currentValue = value; - } - } - - @Override - protected void add(final int index, final Writable value) { - set(index, value); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java new file mode 100644 index 0000000..a86d6f4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.hive.ql.io.parquet.convert; + +import org.apache.hadoop.io.Writable; + +interface ConverterParent { + void set(int index, Writable value); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java deleted file mode 100644 index 0e310fb..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.parquet.convert; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; - -import parquet.io.api.Converter; -import parquet.schema.GroupType; -import parquet.schema.Type; - -/** - * - * A MapWritableGroupConverter, real converter between hive and parquet types recursively for complex types. - * - */ -public class DataWritableGroupConverter extends HiveGroupConverter { - - private final Converter[] converters; - private final HiveGroupConverter parent; - private final int index; - private final Object[] currentArr; - private Writable[] rootMap; - - public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema) { - this(requestedSchema, null, 0, tableSchema); - final int fieldCount = tableSchema.getFieldCount(); - this.rootMap = new Writable[fieldCount]; - } - - public DataWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, - final int index) { - this(groupType, parent, index, groupType); - } - - public DataWritableGroupConverter(final GroupType selectedGroupType, - final HiveGroupConverter parent, final int index, final GroupType containingGroupType) { - this.parent = parent; - this.index = index; - final int totalFieldCount = containingGroupType.getFieldCount(); - final int selectedFieldCount = selectedGroupType.getFieldCount(); - - currentArr = new Object[totalFieldCount]; - converters = new Converter[selectedFieldCount]; - - List selectedFields = selectedGroupType.getFields(); - for (int i = 0; i < selectedFieldCount; i++) { - Type subtype = selectedFields.get(i); - if (containingGroupType.getFields().contains(subtype)) { - converters[i] = getConverterFromDescription(subtype, - containingGroupType.getFieldIndex(subtype.getName()), this); - } else { - throw new IllegalStateException("Group type [" + containingGroupType + - "] does not contain requested field: " + subtype); - } - } - } - - public final ArrayWritable getCurrentArray() { - final Writable[] writableArr; - if (this.rootMap != null) { // We're at the root : we can safely re-use the same map to save perf - writableArr = this.rootMap; - } else { - writableArr = new Writable[currentArr.length]; - } - - for (int i = 0; i < currentArr.length; i++) { - final Object obj = currentArr[i]; - if (obj instanceof List) { - final List objList = (List)obj; - final ArrayWritable arr = new ArrayWritable(Writable.class, - objList.toArray(new Writable[objList.size()])); - writableArr[i] = arr; - } else { - writableArr[i] = (Writable) obj; - } - } - return new ArrayWritable(Writable.class, writableArr); - } - - @Override - final protected void set(final int index, final Writable value) { - currentArr[index] = value; - } - - @Override - public Converter getConverter(final int fieldIndex) { - return converters[fieldIndex]; - } - - @Override - public void start() { - for (int i = 0; i < currentArr.length; i++) { - currentArr[i] = null; - } - } - - @Override - public void end() { - if (parent != null) { - parent.set(index, getCurrentArray()); - } - } - - @Override - protected void add(final int index, final Writable value) { - if (currentArr[index] != null) { - final Object obj = currentArr[index]; - if (obj instanceof List) { - final List list = (List) obj; - list.add(value); - } else { - throw new IllegalStateException("This should be a List: " + obj); - } - } else { - // create a list here because we don't know the final length of the object - // and it is more flexible than ArrayWritable. - // - // converted to ArrayWritable by getCurrentArray(). - final List buffer = new ArrayList(); - buffer.add(value); - currentArr[index] = (Object) buffer; - } - - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java index 5a46136..000e8ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java @@ -13,11 +13,7 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; -import java.util.List; - -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; - import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; import parquet.schema.GroupType; @@ -29,10 +25,10 @@ */ public class DataWritableRecordConverter extends RecordMaterializer { - private final DataWritableGroupConverter root; + private final HiveStructConverter root; public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema) { - this.root = new DataWritableGroupConverter(requestedSchema, tableSchema); + this.root = new HiveStructConverter(requestedSchema, tableSchema); } @Override @@ -44,4 +40,4 @@ public ArrayWritable getCurrentRecord() { public GroupConverter getRootConverter() { return root; } -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index bce6400..23bb364 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -47,7 +47,7 @@ EDOUBLE_CONVERTER(Double.TYPE) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new PrimitiveConverter() { @Override public void addDouble(final double value) { @@ -58,7 +58,7 @@ public void addDouble(final double value) { }, EBOOLEAN_CONVERTER(Boolean.TYPE) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new PrimitiveConverter() { @Override public void addBoolean(final boolean value) { @@ -69,7 +69,7 @@ public void addBoolean(final boolean value) { }, EFLOAT_CONVERTER(Float.TYPE) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new PrimitiveConverter() { @Override public void addFloat(final float value) { @@ -80,7 +80,7 @@ public void addFloat(final float value) { }, EINT32_CONVERTER(Integer.TYPE) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new PrimitiveConverter() { @Override public void addInt(final int value) { @@ -91,7 +91,7 @@ public void addInt(final int value) { }, EINT64_CONVERTER(Long.TYPE) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new PrimitiveConverter() { @Override public void addLong(final long value) { @@ -102,7 +102,7 @@ public void addLong(final long value) { }, EBINARY_CONVERTER(Binary.class) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new BinaryConverter(type, parent, index) { @Override protected BytesWritable convert(Binary binary) { @@ -113,7 +113,7 @@ protected BytesWritable convert(Binary binary) { }, ESTRING_CONVERTER(String.class) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new BinaryConverter(type, parent, index) { @Override protected Text convert(Binary binary) { @@ -124,7 +124,7 @@ protected Text convert(Binary binary) { }, EDECIMAL_CONVERTER(BigDecimal.class) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new BinaryConverter(type, parent, index) { @Override protected HiveDecimalWritable convert(Binary binary) { @@ -135,7 +135,7 @@ protected HiveDecimalWritable convert(Binary binary) { }, ETIMESTAMP_CONVERTER(TimestampWritable.class) { @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new BinaryConverter(type, parent, index) { @Override protected TimestampWritable convert(Binary binary) { @@ -157,10 +157,10 @@ private ETypeConverter(final Class type) { return _type; } - abstract Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent); + abstract PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent); - public static Converter getNewConverter(final PrimitiveType type, final int index, - final HiveGroupConverter parent) { + public static PrimitiveConverter getNewConverter(final PrimitiveType type, final int index, + final ConverterParent parent) { if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) { //TODO- cleanup once parquet support Timestamp type annotation. return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent); @@ -183,11 +183,11 @@ public static Converter getNewConverter(final PrimitiveType type, final int inde public abstract static class BinaryConverter extends PrimitiveConverter { protected final PrimitiveType type; - private final HiveGroupConverter parent; + private final ConverterParent parent; private final int index; private ArrayList lookupTable; - public BinaryConverter(PrimitiveType type, HiveGroupConverter parent, int index) { + public BinaryConverter(PrimitiveType type, ConverterParent parent, int index) { this.type = type; this.parent = parent; this.index = index; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java new file mode 100644 index 0000000..872900b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java @@ -0,0 +1,164 @@ +package org.apache.hadoop.hive.ql.io.parquet.convert; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import parquet.io.api.Converter; +import parquet.schema.GroupType; +import parquet.schema.Type; + +public class HiveCollectionConverter extends HiveGroupConverter { + private final GroupType collectionType; + private final ConverterParent parent; + private final int index; + private final Converter innerConverter; + private final List list = new ArrayList(); + + public static HiveGroupConverter forMap(GroupType mapType, + ConverterParent parent, + int index) { + return new HiveCollectionConverter( + mapType, parent, index, true /* its a map */ ); + } + + public static HiveGroupConverter forList(GroupType listType, + ConverterParent parent, + int index) { + return new HiveCollectionConverter( + listType, parent, index, false /* not a map */ ); + } + + private HiveCollectionConverter(GroupType collectionType, + ConverterParent parent, + int index, boolean isMap) { + this.collectionType = collectionType; + this.parent = parent; + this.index = index; + Type repeatedType = collectionType.getType(0); + if (isMap) { + this.innerConverter = new KeyValueConverter( + repeatedType.asGroupType(), this); + } else if (isElementType(repeatedType, collectionType.getName())) { + this.innerConverter = getConverterFromDescription(repeatedType, 0, this); + } else { + this.innerConverter = new ElementConverter( + repeatedType.asGroupType(), this); + } + } + + @Override + public Converter getConverter(int fieldIndex) { + Preconditions.checkArgument( + fieldIndex == 0, "Invalid field index: " + fieldIndex); + return innerConverter; + } + + @Override + public void start() { + list.clear(); + } + + @Override + public void end() { + parent.set(index, wrapList(new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()])))); + } + + @Override + public void set(int index, Writable value) { + list.add(value); + } + + private static class KeyValueConverter extends HiveGroupConverter { + private final HiveGroupConverter parent; + private final Converter keyConverter; + private final Converter valueConverter; + private Writable[] keyValue = null; + + public KeyValueConverter(GroupType keyValueType, HiveGroupConverter parent) { + this.parent = parent; + this.keyConverter = getConverterFromDescription( + keyValueType.getType(0), 0, this); + this.valueConverter = getConverterFromDescription( + keyValueType.getType(1), 1, this); + } + + @Override + public void set(int fieldIndex, Writable value) { + keyValue[fieldIndex] = value; + } + + @Override + public Converter getConverter(int fieldIndex) { + switch (fieldIndex) { + case 0: + return keyConverter; + case 1: + return valueConverter; + default: + throw new IllegalArgumentException( + "Invalid field index for map key-value: " + fieldIndex); + } + } + + @Override + public void start() { + this.keyValue = new Writable[2]; + } + + @Override + public void end() { + parent.set(0, new ArrayWritable(Writable.class, keyValue)); + } + } + + private static class ElementConverter extends HiveGroupConverter { + private final HiveGroupConverter parent; + private final Converter elementConverter; + private Writable element = null; + + public ElementConverter(GroupType repeatedType, HiveGroupConverter parent) { + this.parent = parent; + this.elementConverter = getConverterFromDescription( + repeatedType.getType(0), 0, this); + } + + @Override + public void set(int index, Writable value) { + this.element = value; + } + + @Override + public Converter getConverter(int i) { + return elementConverter; + } + + @Override + public void start() { + this.element = null; + } + + @Override + public void end() { + parent.set(0, element); + } + } + + private static boolean isElementType(Type repeatedType, String parentName) { + if (repeatedType.isPrimitive() || + (repeatedType.asGroupType().getFieldCount() != 1)) { + return true; + } else if (repeatedType.getName().equals("array")) { + return true; // existing avro data + } else if (repeatedType.getName().equals(parentName + "_tuple")) { + return true; // existing thrift data + } + // false for the following cases: + // * name is "list", which matches the spec + // * name is "bag", which indicates existing hive or pig data + // * ambiguous case, which should be assumed is 3-level according to spec + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java index 78bdf62..11772be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java @@ -13,33 +13,62 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; - import parquet.io.api.Converter; import parquet.io.api.GroupConverter; +import parquet.io.api.PrimitiveConverter; +import parquet.schema.GroupType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; import parquet.schema.Type; -import parquet.schema.Type.Repetition; -public abstract class HiveGroupConverter extends GroupConverter { +public abstract class HiveGroupConverter extends GroupConverter implements ConverterParent { + + protected static PrimitiveConverter getConverterFromDescription(PrimitiveType type, int index, ConverterParent parent) { + if (type == null) { + return null; + } + + return ETypeConverter.getNewConverter(type, index, parent); + } + + protected static HiveGroupConverter getConverterFromDescription(GroupType type, int index, ConverterParent parent) { + if (type == null) { + return null; + } + + OriginalType annotation = type.getOriginalType(); + if (annotation == OriginalType.LIST) { + return HiveCollectionConverter.forList(type, parent, index); + } else if (annotation == OriginalType.MAP) { + return HiveCollectionConverter.forMap(type, parent, index); + } - protected static Converter getConverterFromDescription(final Type type, final int index, - final HiveGroupConverter parent) { + return new HiveStructConverter(type, parent, index); + } + + protected static Converter getConverterFromDescription(Type type, int index, ConverterParent parent) { if (type == null) { return null; } + if (type.isPrimitive()) { - return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent); - } else { - if (type.asGroupType().getRepetition() == Repetition.REPEATED) { - return new ArrayWritableGroupConverter(type.asGroupType(), parent, index); - } else { - return new DataWritableGroupConverter(type.asGroupType(), parent, index); - } + return getConverterFromDescription(type.asPrimitiveType(), index, parent); } + + return getConverterFromDescription(type.asGroupType(), index, parent); } - protected abstract void set(int index, Writable value); + /** + * The original list and map conversion didn't remove the synthetic layer and + * the ObjectInspector had to remove it. This is a temporary fix that adds an + * extra layer for the ObjectInspector to remove. + */ + static ArrayWritable wrapList(ArrayWritable list) { + return new ArrayWritable(Writable.class, new Writable[] {list}); + } - protected abstract void add(int index, Writable value); + public abstract void set(int index, Writable value); -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java new file mode 100644 index 0000000..9778928 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java @@ -0,0 +1,129 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.convert; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import parquet.io.api.Converter; +import parquet.schema.GroupType; +import parquet.schema.Type; + +/** + * + * A MapWritableGroupConverter, real converter between hive and parquet types recursively for complex types. + * + */ +public class HiveStructConverter extends HiveGroupConverter { + + private final Converter[] converters; + private final ConverterParent parent; + private final int index; + private Writable[] writables; + private final List repeatedConverters; + private boolean reuseWritableArray = false; + + public HiveStructConverter(final GroupType requestedSchema, final GroupType tableSchema) { + this(requestedSchema, null, 0, tableSchema); + this.reuseWritableArray = true; + this.writables = new Writable[converters.length]; + } + + public HiveStructConverter(final GroupType groupType, final ConverterParent parent, + final int index) { + this(groupType, parent, index, groupType); + } + + public HiveStructConverter(final GroupType selectedGroupType, + final ConverterParent parent, final int index, final GroupType containingGroupType) { + this.parent = parent; + this.index = index; + final int selectedFieldCount = selectedGroupType.getFieldCount(); + + converters = new Converter[selectedFieldCount]; + this.repeatedConverters = new ArrayList(); + + List selectedFields = selectedGroupType.getFields(); + for (int i = 0; i < selectedFieldCount; i++) { + Type subtype = selectedFields.get(i); + int fieldIndex = containingGroupType.getFieldIndex(subtype.getName()); + if (containingGroupType.getFields().contains(subtype)) { + converters[i] = getFieldConverter(subtype, fieldIndex); + } else { + throw new IllegalStateException("Group type [" + containingGroupType + + "] does not contain requested field: " + subtype); + } + } + } + + private Converter getFieldConverter(Type type, int fieldIndex) { + Converter converter; + if (type.isRepetition(Type.Repetition.REPEATED)) { + if (type.isPrimitive()) { + converter = new Repeated.RepeatedPrimitiveConverter( + type.asPrimitiveType(), this, fieldIndex); + } else { + converter = new Repeated.RepeatedGroupConverter( + type.asGroupType(), this, fieldIndex); + } + + repeatedConverters.add((Repeated) converter); + } else { + converter = getConverterFromDescription(type, fieldIndex, this); + } + + return converter; + } + + public final ArrayWritable getCurrentArray() { + return new ArrayWritable(Writable.class, writables); + } + + @Override + public void set(int fieldIndex, Writable value) { + writables[fieldIndex] = value; + } + + @Override + public Converter getConverter(final int fieldIndex) { + return converters[fieldIndex]; + } + + @Override + public void start() { + if (reuseWritableArray) { + // reset the array to null values + for (int i = 0; i < writables.length; i += 1) { + writables[i] = null; + } + } else { + this.writables = new Writable[converters.length]; + } + for (Repeated repeated : repeatedConverters) { + repeated.parentStart(); + } + } + + @Override + public void end() { + for (Repeated repeated : repeatedConverters) { + repeated.parentEnd(); + } + if (parent != null) { + parent.set(index, getCurrentArray()); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java new file mode 100644 index 0000000..af28b4c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java @@ -0,0 +1,155 @@ +package org.apache.hadoop.hive.ql.io.parquet.convert; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import parquet.column.Dictionary; +import parquet.io.api.Binary; +import parquet.io.api.Converter; +import parquet.io.api.PrimitiveConverter; +import parquet.schema.GroupType; +import parquet.schema.PrimitiveType; + +/** + * Converters for repeated fields need to know when the parent field starts and + * ends to correctly build lists from the repeated values. + */ +public interface Repeated extends ConverterParent { + + public void parentStart(); + + public void parentEnd(); + + /** + * Stands in for a PrimitiveConverter and accumulates multiple values as an + * ArrayWritable. + */ + class RepeatedPrimitiveConverter extends PrimitiveConverter implements Repeated { + private final PrimitiveType primitiveType; + private final PrimitiveConverter wrapped; + private final ConverterParent parent; + private final int index; + private final List list = new ArrayList(); + + public RepeatedPrimitiveConverter(PrimitiveType primitiveType, ConverterParent parent, int index) { + this.primitiveType = primitiveType; + this.parent = parent; + this.index = index; + this.wrapped = HiveGroupConverter.getConverterFromDescription(primitiveType, 0, this); + } + + @Override + public boolean hasDictionarySupport() { + return wrapped.hasDictionarySupport(); + } + + @Override + public void setDictionary(Dictionary dictionary) { + wrapped.setDictionary(dictionary); + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + wrapped.addValueFromDictionary(dictionaryId); + } + + @Override + public void addBinary(Binary value) { + wrapped.addBinary(value); + } + + @Override + public void addBoolean(boolean value) { + wrapped.addBoolean(value); + } + + @Override + public void addDouble(double value) { + wrapped.addDouble(value); + } + + @Override + public void addFloat(float value) { + wrapped.addFloat(value); + } + + @Override + public void addInt(int value) { + wrapped.addInt(value); + } + + @Override + public void addLong(long value) { + wrapped.addLong(value); + } + + @Override + public void parentStart() { + list.clear(); + } + + @Override + public void parentEnd() { + parent.set(index, HiveGroupConverter.wrapList(new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()])))); + } + + @Override + public void set(int index, Writable value) { + list.add(value); + } + } + + /** + * Stands in for a HiveGroupConverter and accumulates multiple values as an + * ArrayWritable. + */ + class RepeatedGroupConverter extends HiveGroupConverter + implements Repeated { + private final GroupType groupType; + private final HiveGroupConverter wrapped; + private final ConverterParent parent; + private final int index; + private final List list = new ArrayList(); + + public RepeatedGroupConverter(GroupType groupType, ConverterParent parent, int index) { + this.groupType = groupType; + this.parent = parent; + this.index = index; + this.wrapped = HiveGroupConverter.getConverterFromDescription(groupType, 0, this); + } + + @Override + public void set(int fieldIndex, Writable value) { + list.add(value); + } + + @Override + public Converter getConverter(int fieldIndex) { + // delegate to the group's converters + return wrapped.getConverter(fieldIndex); + } + + @Override + public void start() { + wrapped.start(); + } + + @Override + public void end() { + wrapped.end(); + } + + @Override + public void parentStart() { + list.clear(); + } + + @Override + public void parentEnd() { + parent.set(index, wrapList(new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()])))); + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java new file mode 100644 index 0000000..53d5cb5 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java @@ -0,0 +1,758 @@ +package org.apache.hadoop.hive.ql.io.parquet; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.api.WriteSupport; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; +import parquet.schema.Types; + +import static parquet.schema.OriginalType.LIST; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +public class TestArrayCompatibility { + + public static FileSystem localFS = null; + + @BeforeClass + public static void initializeFS() throws IOException { + localFS = FileSystem.getLocal(new Configuration()); + } + + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + + @Test + public void testUnannotatedListOfPrimitives() throws Exception { + MessageType fileSchema = Types.buildMessage() + .repeated(INT32).named("list_of_ints") + .named("UnannotatedListOfPrimitives"); + + Path test = writeDirect( + fileSchema, + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_ints", 0); + + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("list_of_ints", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + new IntWritable(34), new IntWritable(35), new IntWritable(36))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testUnannotatedListOfGroups() throws Exception { + Path test = writeDirect( + Types.buildMessage() + .repeatedGroup() + .required(FLOAT).named("x") + .required(FLOAT).named("y") + .named("list_of_points") + .named("UnannotatedListOfGroups"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_points", 0); + + rc.startGroup(); + rc.startField("x", 0); + rc.addFloat(1.0f); + rc.endField("x", 0); + rc.startField("y", 1); + rc.addFloat(1.0f); + rc.endField("y", 1); + rc.endGroup(); + + rc.startGroup(); + rc.startField("x", 0); + rc.addFloat(2.0f); + rc.endField("x", 0); + rc.startField("y", 1); + rc.addFloat(2.0f); + rc.endField("y", 1); + rc.endGroup(); + + rc.endField("list_of_points", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + record(new FloatWritable(1.0f), new FloatWritable(1.0f)), + record(new FloatWritable(2.0f), new FloatWritable(2.0f)))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testThriftPrimitiveInList() throws Exception { + Path test = writeDirect( + Types.buildMessage() + .requiredGroup().as(LIST) + .repeated(INT32).named("list_of_ints_tuple") + .named("list_of_ints") + .named("ThriftPrimitiveInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_ints", 0); + + rc.startGroup(); + rc.startField("list_of_ints_tuple", 0); + + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("list_of_ints_tuple", 0); + rc.endGroup(); + + rc.endField("list_of_ints", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + new IntWritable(34), new IntWritable(35), new IntWritable(36))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", expected, records.get(0)); + } + + @Test + public void testThriftSingleFieldGroupInList() throws Exception { + // this tests the case where older data has an ambiguous structure, but the + // correct interpretation can be determined from the repeated name + + Path test = writeDirect( + Types.buildMessage() + .optionalGroup().as(LIST) + .repeatedGroup() + .required(INT64).named("count") + .named("single_element_groups_tuple") + .named("single_element_groups") + .named("ThriftSingleFieldGroupInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("single_element_groups", 0); + + rc.startGroup(); + rc.startField("single_element_groups_tuple", 0); // start writing array contents + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(1234L); + rc.endField("count", 0); + rc.endGroup(); + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(2345L); + rc.endField("count", 0); + rc.endGroup(); + + rc.endField("single_element_groups_tuple", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("single_element_groups", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + record(new LongWritable(1234L)), + record(new LongWritable(2345L)))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testAvroPrimitiveInList() throws Exception { + Path test = writeDirect( + Types.buildMessage() + .requiredGroup().as(LIST) + .repeated(INT32).named("array") + .named("list_of_ints") + .named("AvroPrimitiveInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_ints", 0); + + rc.startGroup(); + rc.startField("array", 0); + + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("array", 0); + rc.endGroup(); + + rc.endField("list_of_ints", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + new IntWritable(34), new IntWritable(35), new IntWritable(36))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", expected, records.get(0)); + } + + @Test + public void testAvroSingleFieldGroupInList() throws Exception { + // this tests the case where older data has an ambiguous structure, but the + // correct interpretation can be determined from the repeated name, "array" + + Path test = writeDirect( + Types.buildMessage() + .optionalGroup().as(LIST) + .repeatedGroup() + .required(INT64).named("count") + .named("array") + .named("single_element_groups") + .named("AvroSingleFieldGroupInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("single_element_groups", 0); + + rc.startGroup(); + rc.startField("array", 0); // start writing array contents + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(1234L); + rc.endField("count", 0); + rc.endGroup(); + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(2345L); + rc.endField("count", 0); + rc.endGroup(); + + rc.endField("array", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("single_element_groups", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + record(new LongWritable(1234L)), + record(new LongWritable(2345L)))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testAmbiguousSingleFieldGroupInList() throws Exception { + // this tests the case where older data has an ambiguous list and is not + // named indicating that the source considered the group significant + + Path test = writeDirect( + Types.buildMessage() + .optionalGroup().as(LIST) + .repeatedGroup() + .required(INT64).named("count") + .named("single_element_group") + .named("single_element_groups") + .named("SingleFieldGroupInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("single_element_groups", 0); + + rc.startGroup(); + rc.startField("single_element_group", 0); // start writing array contents + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(1234L); + rc.endField("count", 0); + rc.endGroup(); + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(2345L); + rc.endField("count", 0); + rc.endGroup(); + + rc.endField("single_element_group", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("single_element_groups", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + new LongWritable(1234L), + new LongWritable(2345L))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testMultiFieldGroupInList() throws Exception { + // tests the missing element layer, detected by a multi-field group + + Path test = writeDirect( + Types.buildMessage() + .optionalGroup().as(LIST) + .repeatedGroup() + .required(DOUBLE).named("latitude") + .required(DOUBLE).named("longitude") + .named("element") // should not affect schema conversion + .named("locations") + .named("MultiFieldGroupInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + record(new DoubleWritable(0.0), new DoubleWritable(0.0)), + record(new DoubleWritable(0.0), new DoubleWritable(180.0)))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testNewOptionalGroupInList() throws Exception { + Path test = writeDirect( + Types.buildMessage() + .optionalGroup().as(LIST) + .repeatedGroup() + .optionalGroup() + .required(DOUBLE).named("latitude") + .required(DOUBLE).named("longitude") + .named("element") + .named("list") + .named("locations") + .named("NewOptionalGroupInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("list", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a null element (element field is omitted) + rc.startGroup(); // array level + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("list", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + record(new DoubleWritable(0.0), new DoubleWritable(0.0)), + null, + record(new DoubleWritable(0.0), new DoubleWritable(180.0)))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testNewRequiredGroupInList() throws Exception { + Path test = writeDirect( + Types.buildMessage() + .optionalGroup().as(LIST) + .repeatedGroup() + .requiredGroup() + .required(DOUBLE).named("latitude") + .required(DOUBLE).named("longitude") + .named("element") + .named("list") + .named("locations") + .named("NewRequiredGroupInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("list", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("list", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + record(new DoubleWritable(0.0), new DoubleWritable(180.0)), + record(new DoubleWritable(0.0), new DoubleWritable(0.0)))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + @Test + public void testHiveRequiredGroupInList() throws Exception { + // this matches the list structure that Hive writes + Path test = writeDirect( + Types.buildMessage() + .optionalGroup().as(LIST) + .repeatedGroup() + .requiredGroup() + .required(DOUBLE).named("latitude") + .required(DOUBLE).named("longitude") + .named("element") + .named("bag") + .named("locations") + .named("OldRequiredGroupInList"), + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("bag", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("bag", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ArrayWritable expected = record(list( + record(new DoubleWritable(0.0), new DoubleWritable(180.0)), + record(new DoubleWritable(0.0), new DoubleWritable(0.0)))); + + List records = read(test); + Assert.assertEquals("Should have only one record", 1, records.size()); + assertEquals("Should match expected record", + expected, records.get(0)); + } + + private interface DirectWriter { + public void write(RecordConsumer consumer); + } + + private static class DirectWriteSupport extends WriteSupport { + private RecordConsumer recordConsumer; + private final MessageType type; + private final DirectWriter writer; + private final Map metadata; + + private DirectWriteSupport(MessageType type, DirectWriter writer, + Map metadata) { + this.type = type; + this.writer = writer; + this.metadata = metadata; + } + + @Override + public WriteContext init(Configuration configuration) { + return new WriteContext(type, metadata); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(Void record) { + writer.write(recordConsumer); + } + } + + private Path writeDirect(MessageType type, DirectWriter writer) throws IOException { + File temp = tempDir.newFile(UUID.randomUUID().toString()); + temp.deleteOnExit(); + temp.delete(); + + Path path = new Path(temp.getPath()); + + ParquetWriter parquetWriter = new ParquetWriter(path, + new DirectWriteSupport(type, writer, new HashMap())); + parquetWriter.write(null); + parquetWriter.close(); + + return path; + } + + private static ArrayWritable record(Writable... fields) { + return new ArrayWritable(Writable.class, fields); + } + + private static ArrayWritable list(Writable... elements) { + // the ObjectInspector for array and map expects an extra layer + return new ArrayWritable(ArrayWritable.class, new ArrayWritable[] { + new ArrayWritable(Writable.class, elements) + }); + } + + public static String toString(ArrayWritable arrayWritable) { + Writable[] writables = arrayWritable.get(); + String[] strings = new String[writables.length]; + for (int i = 0; i < writables.length; i += 1) { + if (writables[i] instanceof ArrayWritable) { + strings[i] = toString((ArrayWritable) writables[i]); + } else { + strings[i] = String.valueOf(writables[i]); + } + } + return Arrays.toString(strings); + } + + private static void assertEquals(String message, ArrayWritable expected, + ArrayWritable actual) { + Assert.assertEquals(message, toString(expected), toString(actual)); +// if (expected == null) { +// Assert.assertNull("Should be null (" + message + ")", actual); +// } +// Writable[] expectedContent = expected.get(); +// Writable[] actualContent = actual.get(); +// Assert.assertEquals("WritableArray length should match (" + message + ")", +// expectedContent.length, actualContent.length); +// for (int i = 0; i < expectedContent.length; i += 1) { +// if (expectedContent[i] instanceof ArrayWritable) { +// assertEquals(message + " [" + i + "]", +// (ArrayWritable) expectedContent[i], (ArrayWritable) actualContent[i]); +// } else { +// Assert.assertEquals("Value " + i + " should be equal (" + message + ")", +// expectedContent[i], actualContent[i]); +// } +// } +// return Arrays.equals(expectedContent, actualContent); + } + + public static List read(Path parquetFile) throws IOException { + List records = new ArrayList(); + + RecordReader reader = new MapredParquetInputFormat(). + getRecordReader(new FileSplit( + parquetFile, 0, fileLength(parquetFile), (String[]) null), + new JobConf(), null); + + Void alwaysNull = reader.createKey(); + ArrayWritable record = reader.createValue(); + while (reader.next(alwaysNull, record)) { + records.add(record); + record = reader.createValue(); // a new value so the last isn't clobbered + } + + return records; + } + + public static long fileLength(Path localFile) throws IOException { + return localFS.getFileStatus(localFile).getLen(); + } + +} -- 1.9.1