diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java new file mode 100644 index 0000000000..d132e07903 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java @@ -0,0 +1,263 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * It's column level Parquet reader which is used to read a batch of records for a column, + * part of the code is referred from Apache Spark and Apache Parquet. + */ +public abstract class BaseVectorizedColumnReader implements VectorizedColumnReader { + + private static final Logger LOG = LoggerFactory.getLogger(BaseVectorizedColumnReader.class); + + protected boolean skipTimestampConversion = false; + + /** + * Total number of values read. + */ + protected long valuesRead; + + /** + * value that indicates the end of the current page. That is, + * if valuesRead == endOfPageValueCount, we are at the end of the page. + */ + protected long endOfPageValueCount; + + /** + * The dictionary, if this column has dictionary encoding. + */ + protected final Dictionary dictionary; + + /** + * If true, the current page is dictionary encoded. + */ + protected boolean isCurrentPageDictionaryEncoded; + + /** + * Maximum definition level for this column. + */ + protected final int maxDefLevel; + + protected int definitionLevel; + protected int repetitionLevel; + + /** + * Repetition/Definition/Value readers. + */ + protected IntIterator repetitionLevelColumn; + protected IntIterator definitionLevelColumn; + protected ValuesReader dataColumn; + + /** + * Total values in the current page. + */ + protected int pageValueCount; + + protected final PageReader pageReader; + protected final ColumnDescriptor descriptor; + protected final Type type; + + public BaseVectorizedColumnReader( + ColumnDescriptor descriptor, + PageReader pageReader, + boolean skipTimestampConversion, + Type type) throws IOException { + this.descriptor = descriptor; + this.type = type; + this.pageReader = pageReader; + this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + this.skipTimestampConversion = skipTimestampConversion; + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + try { + this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); + this.isCurrentPageDictionaryEncoded = true; + } catch (IOException e) { + throw new IOException("could not decode the dictionary for " + descriptor, e); + } + } else { + this.dictionary = null; + this.isCurrentPageDictionaryEncoded = false; + } + } + + protected void readRepetitionAndDefinitionLevels() { + repetitionLevel = repetitionLevelColumn.nextInt(); + definitionLevel = definitionLevelColumn.nextInt(); + valuesRead++; + } + + protected void readPage() throws IOException { + DataPage page = pageReader.readPage(); + + if (page == null) { + return; + } + // TODO: Why is this a visitor? + page.accept(new DataPage.Visitor() { + @Override + public Void visit(DataPageV1 dataPageV1) { + readPageV1(dataPageV1); + return null; + } + + @Override + public Void visit(DataPageV2 dataPageV2) { + readPageV2(dataPageV2); + return null; + } + }); + } + + private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { + this.pageValueCount = valueCount; + this.endOfPageValueCount = valuesRead + pageValueCount; + if (dataEncoding.usesDictionary()) { + this.dataColumn = null; + if (dictionary == null) { + throw new IOException( + "could not read page in col " + descriptor + + " as the dictionary was missing for encoding " + dataEncoding); + } + dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); + this.isCurrentPageDictionaryEncoded = true; + } else { + dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); + this.isCurrentPageDictionaryEncoded = false; + } + + try { + dataColumn.initFromPage(pageValueCount, bytes, offset); + } catch (IOException e) { + throw new IOException("could not read page in col " + descriptor, e); + } + } + + private void readPageV1(DataPageV1 page) { + ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); + ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); + this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); + this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); + try { + byte[] bytes = page.getBytes().toByteArray(); + LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); + LOG.debug("reading repetition levels at 0"); + rlReader.initFromPage(pageValueCount, bytes, 0); + int next = rlReader.getNextOffset(); + LOG.debug("reading definition levels at " + next); + dlReader.initFromPage(pageValueCount, bytes, next); + next = dlReader.getNextOffset(); + LOG.debug("reading data at " + next); + initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private void readPageV2(DataPageV2 page) { + this.pageValueCount = page.getValueCount(); + this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), + page.getRepetitionLevels()); + this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); + try { + LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); + initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { + try { + if (maxLevel == 0) { + return new NullIntIterator(); + } + return new RLEIntIterator( + new RunLengthBitPackingHybridDecoder( + BytesUtils.getWidthFromMaxInt(maxLevel), + new ByteArrayInputStream(bytes.toByteArray()))); + } catch (IOException e) { + throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e); + } + } + + /** + * Utility classes to abstract over different way to read ints with different encodings. + * TODO: remove this layer of abstraction? + */ + abstract static class IntIterator { + abstract int nextInt(); + } + + protected static final class ValuesReaderIntIterator extends IntIterator { + ValuesReader delegate; + + public ValuesReaderIntIterator(ValuesReader delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + return delegate.readInteger(); + } + } + + protected static final class RLEIntIterator extends IntIterator { + RunLengthBitPackingHybridDecoder delegate; + + public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + try { + return delegate.readInt(); + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + } + + protected static final class NullIntIterator extends IntIterator { + @Override + int nextInt() { return 0; } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java new file mode 100644 index 0000000000..0cdcb75195 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java @@ -0,0 +1,468 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.schema.Type; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * It's column level Parquet reader which is used to read a batch of records for a list column. + */ +public class VectorizedListColumnReader extends BaseVectorizedColumnReader { + + // The value read in last time + private Object lastValue; + // If last value is null + private Boolean lastIsNull; + + public VectorizedListColumnReader( + ColumnDescriptor descriptor, + PageReader pageReader, + boolean skipTimestampConversion, + Type type) throws IOException { + super(descriptor, pageReader, skipTimestampConversion, type); + } + + @Override + public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException { + // Initialize the offsets, lengths, isNull for the new batch. + int lastRowId = -1; + ListColumnVector lcv = (ListColumnVector) column; + List valueList = new ArrayList(); + // for dictionary encoding, the value type in list will be integer + if (isCurrentPageDictionaryEncoded) { + valueList = new ArrayList(); + } + // If there has value left from last call, add the value to the current batch. + if (lastValue != null) { + lastRowId = 0; + valueList.add(lastValue); + lcv.offsets[0] = 0; + lcv.lengths[0] = 1; + lcv.isNull[0] = lastIsNull; + } + PrimitiveObjectInspector.PrimitiveCategory category = + ((PrimitiveTypeInfo)((ListTypeInfo) columnType).getListElementTypeInfo()).getPrimitiveCategory(); + while (lastRowId < total) { + // Compute the number of values we want to read in this page. + int leftInPage = (int) (endOfPageValueCount - valuesRead); + if (leftInPage == 0) { + readPage(); + leftInPage = (int) (endOfPageValueCount - valuesRead); + // No value left, return. + if (leftInPage == 0) { + break; + } + } + // Read the data from page and return the current index of this batch. + lastRowId = readRows(category, leftInPage, total, lastRowId, lcv, valueList); + } + // Decode the value if necessary + if (isCurrentPageDictionaryEncoded) { + valueList = decodeDictionaryIds((List)valueList); + } + // Convert value list to the final ListColumnVector + convertValueListToListColumnVector(category, (ListColumnVector) column, valueList); + } + + /** + * + * @param category + * @param pageRows + * @param requestBatchRows + * @param lastRowId + * @return + * @throws IOException + */ + private int readRows( + PrimitiveObjectInspector.PrimitiveCategory category, int pageRows, int requestBatchRows, + int lastRowId, ListColumnVector lcv, List valueList) throws IOException { + int left = pageRows; + // if page has data and the input rows is less than request batch, read and process the row in page. + while (left > 0 && lastRowId < requestBatchRows) { + readRepetitionAndDefinitionLevels(); + // if the row in page is not null + if (definitionLevel >= maxDefLevel) { + // read the data of row in page + if (isCurrentPageDictionaryEncoded) { + lastValue = dataColumn.readValueDictionaryId(); + } else { + lastValue = readRowForPrimary(category); + } + lastIsNull = false; + // repetitionLevel = 0, the data is for the new row in ListColumnVector + if (repetitionLevel == 0) { + lastRowId++; + // lcv.offsets.length means requested row number for this batch, if read enough rows as request, return + if (lastRowId == lcv.offsets.length) { + break; + } + valueList.add(lastValue); + // update the offsets, lengths for the new row in ListColumnVector + // set the index for current row, it should be the index of last value in the valueList + lcv.offsets[lastRowId] = valueList.size() - 1; + lcv.lengths[lastRowId] = 1; + lcv.isNull[lastRowId] = false; + } else { + // repetitionLevel > 0, the data should be added to the existed row in ListColumnVector + valueList.add(lastValue); + // increase length for existed row in ListColumnVector + lcv.lengths[lastRowId]++; + } + } else { + // if the row in page is null + lastRowId++; + lastValue = 0; + lastIsNull = true; + // lcv.offsets.length means requested row number for this batch, if read enough rows as request, return + if (lastRowId == lcv.offsets.length) { + break; + } + // set the default value for new row in ListColumnVector and update the offsets, lengths information + addDefaultValue(category, valueList); + // set the index for current row, it should be the index of last value in the valueList + lcv.offsets[lastRowId] = valueList.size() - 1; + lcv.lengths[lastRowId] = 1; + lcv.isNull[lastRowId] = true; + } + left--; + } + return lastRowId; + } + + private void addDefaultValue(PrimitiveObjectInspector.PrimitiveCategory category, List valueList) { + if (isCurrentPageDictionaryEncoded) { + valueList.add(0); + } else { + switch (category) { + case INT: + case BYTE: + case SHORT: + case BOOLEAN: + valueList.add(0); + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + valueList.add(0l); + break; + case DOUBLE: + valueList.add(0.0); + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + case DECIMAL: + valueList.add(new byte[0]); + break; + case FLOAT: + valueList.add(0.0f); + break; + case INTERVAL_DAY_TIME: + case TIMESTAMP: + default: + throw new RuntimeException("Unsupported type in the list: " + type); + } + } + } + + private Object readRowForPrimary(PrimitiveObjectInspector.PrimitiveCategory category) { + switch (category) { + case INT: + case BYTE: + case SHORT: + return dataColumn.readInteger(); + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + return dataColumn.readLong(); + case BOOLEAN: + return dataColumn.readBoolean() ? 1 : 0; + case DOUBLE: + return dataColumn.readDouble(); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + return dataColumn.readBytes().getBytesUnsafe(); + case FLOAT: + return dataColumn.readFloat(); + case DECIMAL: + return dataColumn.readBytes().getBytesUnsafe(); + case INTERVAL_DAY_TIME: + case TIMESTAMP: + default: + throw new RuntimeException("Unsupported type in the list: " + type); + } + } + + private List decodeDictionaryIds(List valueList) { + int total = valueList.size(); + List resultList; + switch (descriptor.getType()) { + case INT32: + resultList = new ArrayList(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToInt(valueList.get(i))); + } + break; + case INT64: + resultList = new ArrayList(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToLong((valueList.get(i)))); + } + break; + case FLOAT: + resultList = new ArrayList(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToFloat(valueList.get(i))); + } + break; + case DOUBLE: + resultList = new ArrayList(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToDouble(valueList.get(i))); + } + break; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + resultList = new ArrayList(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToBinary(valueList.get(i)).getBytesUnsafe()); + } + break; + default: + throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); + } + return resultList; + } + + private void fillColumnVector(PrimitiveObjectInspector.PrimitiveCategory category, ListColumnVector lcv, + List valueList) { + int total = valueList.size(); + lcv.childCount = total; + switch (category) { + case INT: + case BYTE: + case SHORT: + case BOOLEAN: + lcv.child = new LongColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((LongColumnVector)lcv.child).vector[i] = ((List)valueList).get(i); + } + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + lcv.child = new LongColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((LongColumnVector)lcv.child).vector[i] = ((List)valueList).get(i); + } + break; + case DOUBLE: + lcv.child = new DoubleColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((DoubleColumnVector)lcv.child).vector[i] = ((List)valueList).get(i); + } + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + lcv.child = new BytesColumnVector(total); + lcv.child.init(); + for (int i = 0; i < valueList.size(); i++) { + ((BytesColumnVector)lcv.child).setVal(i, ((List)valueList).get(i)); + } + break; + case FLOAT: + lcv.child = new DoubleColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((DoubleColumnVector)lcv.child).vector[i] = ((List)valueList).get(i); + } + break; + case DECIMAL: + int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision(); + int scale = type.asPrimitiveType().getDecimalMetadata().getScale(); + lcv.child = new DecimalColumnVector(total, precision, scale); + for (int i = 0; i < valueList.size(); i++) { + ((DecimalColumnVector)lcv.child).vector[i].set(((List)valueList).get(i), scale); + } + break; + case INTERVAL_DAY_TIME: + case TIMESTAMP: + default: + throw new RuntimeException("Unsupported type in the list: " + type); + } + } + + /** + * Finish the result ListColumnVector with all collected information. + */ + private void convertValueListToListColumnVector(PrimitiveObjectInspector.PrimitiveCategory category, + ListColumnVector lcv, List valueList) { + // Fill the child of ListColumnVector with valueList + fillColumnVector(category, lcv, valueList); + setIsRepeating(lcv); + } + + private void setIsRepeating(ListColumnVector lcv) { + ColumnVector child0 = getChildData(lcv, 0); + for (int i = 1; i < lcv.offsets.length; i++) { + ColumnVector currentChild = getChildData(lcv, i); + if (!compareColumnVector(child0, currentChild)) { + lcv.isRepeating = false; + return; + } + } + lcv.isRepeating = true; + } + + /** + * Get the child ColumnVector of ListColumnVector + */ + private ColumnVector getChildData(ListColumnVector lcv, int index) { + if (lcv.offsets[index] > Integer.MAX_VALUE || lcv.lengths[index] > Integer.MAX_VALUE) { + throw new RuntimeException("The element number in list is out of scope."); + } + int start = (int)lcv.offsets[index]; + int length = (int)lcv.lengths[index]; + ColumnVector child = lcv.child; + ColumnVector resultCV = null; + if (child instanceof LongColumnVector) { + resultCV = new LongColumnVector(length); + System.arraycopy(((LongColumnVector)lcv.child).vector, start, + ((LongColumnVector)resultCV).vector, 0, length); + } + if (child instanceof DoubleColumnVector) { + resultCV = new DoubleColumnVector(length); + System.arraycopy(((DoubleColumnVector)lcv.child).vector, start, + ((DoubleColumnVector)resultCV).vector, 0, length); + } + if (child instanceof BytesColumnVector) { + resultCV = new BytesColumnVector(length); + System.arraycopy(((BytesColumnVector)lcv.child).vector, start, + ((BytesColumnVector)resultCV).vector, 0, length); + } + if (child instanceof DecimalColumnVector) { + resultCV = new DecimalColumnVector(length, + ((DecimalColumnVector) child).precision, ((DecimalColumnVector) child).scale); + System.arraycopy(((DecimalColumnVector)lcv.child).vector, start, + ((DecimalColumnVector)resultCV).vector, 0, length); + } + return resultCV; + } + + private boolean compareColumnVector(ColumnVector cv1, ColumnVector cv2) { + if (cv1 instanceof LongColumnVector && cv2 instanceof LongColumnVector) { + return compareLongColumnVector((LongColumnVector)cv1, (LongColumnVector)cv2); + } + if (cv1 instanceof DoubleColumnVector && cv2 instanceof DoubleColumnVector) { + return compareDoubleColumnVector((DoubleColumnVector)cv1, (DoubleColumnVector)cv2); + } + if (cv1 instanceof BytesColumnVector && cv2 instanceof BytesColumnVector) { + return compareBytesColumnVector((BytesColumnVector)cv1, (BytesColumnVector)cv2); + } + if (cv1 instanceof DecimalColumnVector && cv2 instanceof DecimalColumnVector) { + return compareDecimalColumnVector((DecimalColumnVector)cv1, (DecimalColumnVector)cv2); + } + throw new RuntimeException("Unsupported ColumnVector comparision between " + cv1.getClass().getName() + + " and " + cv2.getClass().getName()); + } + + private boolean compareLongColumnVector(LongColumnVector cv1, LongColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2) { + for (int i = 0; i < length1; i++) { + if (cv1.vector[i] != cv2.vector[i]) { + return false; + } + } + } else { + return false; + } + return true; + } + + private boolean compareDoubleColumnVector(DoubleColumnVector cv1, DoubleColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2) { + for (int i = 0; i < length1; i++) { + if (cv1.vector[i] != cv2.vector[i]) { + return false; + } + } + } else { + return false; + } + return true; + } + + private boolean compareDecimalColumnVector(DecimalColumnVector cv1, DecimalColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2 && cv1.scale == cv2.scale && cv1.precision == cv2.precision) { + for (int i = 0; i < length1; i++) { + if (cv1.vector[i] != null && cv2.vector[i] == null + || cv1.vector[i] == null && cv2.vector[i] != null + || cv1.vector[i] != null && cv2.vector[i] != null && !cv1.vector[i].equals(cv2.vector[i])) { + return false; + } + } + } else { + return false; + } + return true; + } + + private boolean compareBytesColumnVector(BytesColumnVector cv1, BytesColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2) { + for (int i = 0; i < length1; i++) { + int innerLen1 = cv1.vector[i].length; + int innerLen2 = cv2.vector[i].length; + if (innerLen1 == innerLen2) { + for (int j = 0; j < innerLen1; j++) { + if (cv1.vector[i][j] != cv2.vector[i][j]) { + return false; + } + } + } else { + return false; + } + } + } else { + return false; + } + return true; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 1d9dba7842..941bd7da01 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; @@ -493,10 +495,34 @@ private VectorizedColumnReader buildVectorizedParquetReader( } return new VectorizedStructColumnReader(fieldReaders); case LIST: + checkListColumnSupport(((ListTypeInfo) typeInfo).getListElementTypeInfo()); + if (columnDescriptors == null || columnDescriptors.isEmpty()) { + throw new RuntimeException( + "Failed to find related Parquet column descriptor with type " + type); + } + return new VectorizedListColumnReader(descriptors.get(0), + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); case MAP: case UNION: default: throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name()); } } + + /** + * Check if the element type in list is supported by vectorization read. + * Supported type: INT, BYTE, SHORT, DATE, INTERVAL_YEAR_MONTH, LONG, BOOLEAN, DOUBLE, BINARY, STRING, CHAR, VARCHAR, + * FLOAT, DECIMAL + */ + private void checkListColumnSupport(TypeInfo elementType) { + if (elementType instanceof PrimitiveTypeInfo) { + switch (((PrimitiveTypeInfo)elementType).getPrimitiveCategory()) { + case INTERVAL_DAY_TIME: + case TIMESTAMP: + throw new RuntimeException("Unsupported primitive type used in list:: " + elementType); + } + } else { + throw new RuntimeException("Unsupported type used in list:" + elementType); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index e9543c6e15..5e577d2bf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -23,113 +23,29 @@ import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.page.DataPage; -import org.apache.parquet.column.page.DataPageV1; -import org.apache.parquet.column.page.DataPageV2; -import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReader; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; -import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Timestamp; -import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; -import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; -import static org.apache.parquet.column.ValuesType.VALUES; - /** * It's column level Parquet reader which is used to read a batch of records for a column, * part of the code is referred from Apache Spark and Apache Parquet. */ -public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader { - - private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class); - - private boolean skipTimestampConversion = false; - - /** - * Total number of values read. - */ - private long valuesRead; - - /** - * value that indicates the end of the current page. That is, - * if valuesRead == endOfPageValueCount, we are at the end of the page. - */ - private long endOfPageValueCount; - - /** - * The dictionary, if this column has dictionary encoding. - */ - private final Dictionary dictionary; - - /** - * If true, the current page is dictionary encoded. - */ - private boolean isCurrentPageDictionaryEncoded; - - /** - * Maximum definition level for this column. - */ - private final int maxDefLevel; - - private int definitionLevel; - private int repetitionLevel; - - /** - * Repetition/Definition/Value readers. - */ - private IntIterator repetitionLevelColumn; - private IntIterator definitionLevelColumn; - private ValuesReader dataColumn; - - /** - * Total values in the current page. - */ - private int pageValueCount; - - private final PageReader pageReader; - private final ColumnDescriptor descriptor; - private final Type type; +public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader { public VectorizedPrimitiveColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, Type type) throws IOException { - this.descriptor = descriptor; - this.type = type; - this.pageReader = pageReader; - this.maxDefLevel = descriptor.getMaxDefinitionLevel(); - this.skipTimestampConversion = skipTimestampConversion; - - DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); - if (dictionaryPage != null) { - try { - this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); - this.isCurrentPageDictionaryEncoded = true; - } catch (IOException e) { - throw new IOException("could not decode the dictionary for " + descriptor, e); - } - } else { - this.dictionary = null; - this.isCurrentPageDictionaryEncoded = false; - } + super(descriptor, pageReader, skipTimestampConversion, type); } + @Override public void readBatch( int total, ColumnVector column, @@ -164,6 +80,7 @@ private void readBatchHelper( TypeInfo columnType, int rowId) throws IOException { PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + switch (primitiveColumnType.getPrimitiveCategory()) { case INT: case BYTE: @@ -444,143 +361,4 @@ private void decodeDictionaryIds( throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); } } - - private void readRepetitionAndDefinitionLevels() { - repetitionLevel = repetitionLevelColumn.nextInt(); - definitionLevel = definitionLevelColumn.nextInt(); - valuesRead++; - } - - private void readPage() throws IOException { - DataPage page = pageReader.readPage(); - // TODO: Why is this a visitor? - page.accept(new DataPage.Visitor() { - @Override - public Void visit(DataPageV1 dataPageV1) { - readPageV1(dataPageV1); - return null; - } - - @Override - public Void visit(DataPageV2 dataPageV2) { - readPageV2(dataPageV2); - return null; - } - }); - } - - private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { - this.pageValueCount = valueCount; - this.endOfPageValueCount = valuesRead + pageValueCount; - if (dataEncoding.usesDictionary()) { - this.dataColumn = null; - if (dictionary == null) { - throw new IOException( - "could not read page in col " + descriptor + - " as the dictionary was missing for encoding " + dataEncoding); - } - dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); - this.isCurrentPageDictionaryEncoded = true; - } else { - dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); - this.isCurrentPageDictionaryEncoded = false; - } - - try { - dataColumn.initFromPage(pageValueCount, bytes, offset); - } catch (IOException e) { - throw new IOException("could not read page in col " + descriptor, e); - } - } - - private void readPageV1(DataPageV1 page) { - ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); - ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); - this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); - this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); - try { - byte[] bytes = page.getBytes().toByteArray(); - LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); - LOG.debug("reading repetition levels at 0"); - rlReader.initFromPage(pageValueCount, bytes, 0); - int next = rlReader.getNextOffset(); - LOG.debug("reading definition levels at " + next); - dlReader.initFromPage(pageValueCount, bytes, next); - next = dlReader.getNextOffset(); - LOG.debug("reading data at " + next); - initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); - } catch (IOException e) { - throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); - } - } - - private void readPageV2(DataPageV2 page) { - this.pageValueCount = page.getValueCount(); - this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), - page.getRepetitionLevels()); - this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); - try { - LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); - initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); - } catch (IOException e) { - throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); - } - } - - private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { - try { - if (maxLevel == 0) { - return new NullIntIterator(); - } - return new RLEIntIterator( - new RunLengthBitPackingHybridDecoder( - BytesUtils.getWidthFromMaxInt(maxLevel), - new ByteArrayInputStream(bytes.toByteArray()))); - } catch (IOException e) { - throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e); - } - } - - /** - * Utility classes to abstract over different way to read ints with different encodings. - * TODO: remove this layer of abstraction? - */ - abstract static class IntIterator { - abstract int nextInt(); - } - - protected static final class ValuesReaderIntIterator extends IntIterator { - ValuesReader delegate; - - public ValuesReaderIntIterator(ValuesReader delegate) { - this.delegate = delegate; - } - - @Override - int nextInt() { - return delegate.readInteger(); - } - } - - protected static final class RLEIntIterator extends IntIterator { - RunLengthBitPackingHybridDecoder delegate; - - public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { - this.delegate = delegate; - } - - @Override - int nextInt() { - try { - return delegate.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } - } - } - - protected static final class NullIntIterator extends IntIterator { - @Override - int nextInt() { return 0; } - } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 81d8cffa85..66d7e90659 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -133,4 +133,15 @@ public void testNullSplitForParquetReader() throws Exception { TestVectorizedParquetRecordReader testReader = new TestVectorizedParquetRecordReader(fsplit, jobConf); Assert.assertNull("Test should return null split from getSplit() method", testReader.getSplit(fsplit, jobConf)); } + + @Test + public void testListRead() throws Exception { + listWithIntRead(isDictionaryEncoding); + listWithLongRead(isDictionaryEncoding); + listWithFloatRead(isDictionaryEncoding); + listWithDoubleRead(isDictionaryEncoding); + listWithBooleanRead(); + listWithBinaryRead(isDictionaryEncoding); + listWithDecimalRead(isDictionaryEncoding); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java index 1e60192788..523cba0661 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -82,4 +82,15 @@ public void structReadSomeNull() throws Exception { public void decimalRead() throws Exception { decimalRead(isDictionaryEncoding); } + + @Test + public void testListRead() throws Exception { + listWithIntRead(isDictionaryEncoding); + listWithLongRead(isDictionaryEncoding); + listWithFloatRead(isDictionaryEncoding); + listWithDoubleRead(isDictionaryEncoding); + listWithBooleanRead(); + listWithBinaryRead(isDictionaryEncoding); + listWithDecimalRead(isDictionaryEncoding); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index 1a5d0952fa..e1650e0ccd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -115,6 +116,14 @@ + " optional int32 array_element;\n" + " }\n" + "}\n" + + "repeated int32 list_int32_field;" + + "repeated int64 list_int64_field;" + + "repeated double list_double_field;" + + "repeated float list_float_field;" + + "repeated boolean list_boolean_field;" + + "repeated fixed_len_byte_array(3) list_byte_array_field;" + + "repeated binary list_binary_field;" + + "repeated binary list_decimal_field (DECIMAL(5,2));" + "} "); protected static void removeFile() throws IOException { @@ -234,6 +243,8 @@ protected static FileSplit getFileSplit(Job vectorJob) throws IOException, Inter protected static void writeData(ParquetWriter writer, boolean isDictionaryEncoding) throws IOException { SimpleGroupFactory f = new SimpleGroupFactory(schema); + int listMaxSize = 4; + int listElementIndex = 0; for (int i = 0; i < nElements; i++) { boolean isNull = isNull(i); int intVal = getIntValue(isDictionaryEncoding, i); @@ -295,6 +306,23 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar arrayGroup.addGroup("bag").append("array_element", intVal); } + int listSize = i % listMaxSize + 1; + if (!isNull) { + for (int j = 0; j < listSize; j++) { + group.append("list_int32_field", getIntValue(isDictionaryEncoding, listElementIndex)); + group.append("list_int64_field", getLongValue(isDictionaryEncoding, listElementIndex)); + group.append("list_double_field", getDoubleValue(isDictionaryEncoding, listElementIndex)); + group.append("list_float_field", getFloatValue(isDictionaryEncoding, listElementIndex)); + group.append("list_boolean_field", getBooleanValue(listElementIndex)); + group.append("list_binary_field", getBinaryValue(isDictionaryEncoding, listElementIndex)); + + HiveDecimal hd = getDecimal(isDictionaryEncoding, listElementIndex).setScale(2); + HiveDecimalWritable hdw = new HiveDecimalWritable(hd); + group.append("list_decimal_field", Binary.fromConstantByteArray(hdw.getInternalStorage())); + listElementIndex++; + } + } + writer.write(group); } writer.close(); @@ -542,6 +570,290 @@ protected void structRead(boolean isDictionaryEncoding) throws Exception { } } + protected void listWithIntRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_int32_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {repeated int32 list_int32_field;}"; + VectorizedParquetRecordReader reader = createTestParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + LongColumnVector childVector = (LongColumnVector) vector.child; + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == nElements) { + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertEquals(getIntValue(isDictionaryEncoding, index), childVector.vector[(int) (start + j)]); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", nElements, row); + } finally { + reader.close(); + } + } + + protected void listWithLongRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_int64_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message hive_schema {repeated int64 list_int64_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + LongColumnVector childVector = (LongColumnVector) vector.child; + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == nElements) { + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertEquals(getLongValue(isDictionaryEncoding, index), childVector.vector[(int) (start + j)]); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", nElements, row); + } finally { + reader.close(); + } + } + + protected void listWithDoubleRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_double_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message hive_schema {repeated double list_double_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + DoubleColumnVector childVector = (DoubleColumnVector) vector.child; + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == nElements) { + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertEquals(getDoubleValue(isDictionaryEncoding, index), childVector.vector[(int) (start + j)], 0); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", nElements, row); + } finally { + reader.close(); + } + } + + protected void listWithFloatRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_float_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message hive_schema {repeated float list_float_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + DoubleColumnVector childVector = (DoubleColumnVector) vector.child; + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == nElements) { + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertEquals(getFloatValue(isDictionaryEncoding, index), childVector.vector[(int) (start + j)], 0); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", nElements, row); + } finally { + reader.close(); + } + } + + protected void listWithBooleanRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_boolean_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message hive_schema {repeated boolean list_boolean_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + LongColumnVector childVector = (LongColumnVector) vector.child; + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == nElements) { + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertEquals((getBooleanValue(index) ? 1 : 0), childVector.vector[(int) (start + j)]); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", nElements, row); + } finally { + reader.close(); + } + } + + protected void listWithBinaryRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_binary_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message hive_schema {repeated binary list_binary_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + BytesColumnVector childVector = (BytesColumnVector) vector.child; + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == nElements) { + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + int childIndex = (int) (start + j); + String actual = new String(ArrayUtils + .subarray(childVector.vector[childIndex], childVector.start[childIndex], + childVector.start[childIndex] + childVector.length[childIndex])); + assertEquals(getStr(isDictionaryEncoding, index), actual); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", nElements, row); + } finally { + reader.close(); + } + } + + protected void listWithDecimalRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_decimal_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message hive_schema {repeated binary list_decimal_field (DECIMAL(5,2));}", conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + DecimalColumnVector childVector = (DecimalColumnVector) vector.child; + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == nElements) { + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertEquals(getDecimal(isDictionaryEncoding, index), childVector.vector[(int) (start + j)].getHiveDecimal()); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", nElements, row); + } finally { + reader.close(); + } + } + protected void nestedStructRead0(boolean isDictionaryEncoding) throws Exception { Configuration conf = new Configuration(); conf.set(IOConstants.COLUMNS, "nested_struct_field");