diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java index 5a9c7f9..e3be982 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java @@ -1,9 +1,13 @@ /** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -11,561 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.ql.io.parquet.vector; -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; -import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.BytesUtils; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.page.DataPage; -import org.apache.parquet.column.page.DataPageV1; -import org.apache.parquet.column.page.DataPageV2; -import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.page.PageReader; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; -import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.sql.Timestamp; -import java.util.Arrays; - -import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; -import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; -import static org.apache.parquet.column.ValuesType.VALUES; - -/** - * It's column level Parquet reader which is used to read a batch of records for a column, - * part of the code is referred from Apache Spark and Apache Parquet. - */ -public class VectorizedColumnReader { - - private static final Logger LOG = LoggerFactory.getLogger(VectorizedColumnReader.class); - - private boolean skipTimestampConversion = false; - - /** - * Total number of values read. - */ - private long valuesRead; +public interface VectorizedColumnReader { /** - * value that indicates the end of the current page. That is, - * if valuesRead == endOfPageValueCount, we are at the end of the page. + * read records with specified size and type into the columnVector + * + * @param total number of records to read into the column vector + * @param column column vector where the reader will read data into + * @param columnType the type of column vector + * @throws IOException */ - private long endOfPageValueCount; - - /** - * The dictionary, if this column has dictionary encoding. - */ - private final Dictionary dictionary; - - /** - * If true, the current page is dictionary encoded. - */ - private boolean isCurrentPageDictionaryEncoded; - - /** - * Maximum definition level for this column. - */ - private final int maxDefLevel; - - private int definitionLevel; - private int repetitionLevel; - - /** - * Repetition/Definition/Value readers. - */ - private IntIterator repetitionLevelColumn; - private IntIterator definitionLevelColumn; - private ValuesReader dataColumn; - - /** - * Total values in the current page. - */ - private int pageValueCount; - - private final PageReader pageReader; - private final ColumnDescriptor descriptor; - private final Type type; - - public VectorizedColumnReader( - ColumnDescriptor descriptor, - PageReader pageReader, - boolean skipTimestampConversion, - Type type) throws IOException { - this.descriptor = descriptor; - this.type = type; - this.pageReader = pageReader; - this.maxDefLevel = descriptor.getMaxDefinitionLevel(); - this.skipTimestampConversion = skipTimestampConversion; - - DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); - if (dictionaryPage != null) { - try { - this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); - this.isCurrentPageDictionaryEncoded = true; - } catch (IOException e) { - throw new IOException("could not decode the dictionary for " + descriptor, e); - } - } else { - this.dictionary = null; - this.isCurrentPageDictionaryEncoded = false; - } - } - void readBatch( int total, ColumnVector column, - TypeInfo columnType) throws IOException { - - int rowId = 0; - while (total > 0) { - // Compute the number of values we want to read in this page. - int leftInPage = (int) (endOfPageValueCount - valuesRead); - if (leftInPage == 0) { - readPage(); - leftInPage = (int) (endOfPageValueCount - valuesRead); - } - - int num = Math.min(total, leftInPage); - if (isCurrentPageDictionaryEncoded) { - LongColumnVector dictionaryIds = new LongColumnVector(); - // Read and decode dictionary ids. - readDictionaryIDs(num, dictionaryIds, rowId); - decodeDictionaryIds(rowId, num, column, dictionaryIds); - } else { - // assign values in vector - PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; - switch (primitiveColumnType.getPrimitiveCategory()) { - case INT: - case BYTE: - case SHORT: - readIntegers(num, (LongColumnVector) column, rowId); - break; - case DATE: - case INTERVAL_YEAR_MONTH: - case LONG: - readLongs(num, (LongColumnVector) column, rowId); - break; - case BOOLEAN: - readBooleans(num, (LongColumnVector) column, rowId); - break; - case DOUBLE: - readDoubles(num, (DoubleColumnVector) column, rowId); - break; - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - readBinaries(num, (BytesColumnVector) column, rowId); - break; - case FLOAT: - readFloats(num, (DoubleColumnVector) column, rowId); - break; - case DECIMAL: - readDecimal(num, (DecimalColumnVector) column, rowId); - break; - case INTERVAL_DAY_TIME: - case TIMESTAMP: - default: - throw new IOException( - "Unsupported type category: " + primitiveColumnType.getPrimitiveCategory()); - } - } - rowId += num; - total -= num; - } - } - - private void readDictionaryIDs( - int total, - LongColumnVector c, - int rowId) throws IOException { - int left = total; - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readValueDictionaryId(); - c.isNull[rowId] = false; - c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - private void readIntegers( - int total, - LongColumnVector c, - int rowId) throws IOException { - int left = total; - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readInteger(); - c.isNull[rowId] = false; - c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - private void readDoubles( - int total, - DoubleColumnVector c, - int rowId) throws IOException { - int left = total; - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readDouble(); - c.isNull[rowId] = false; - c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - private void readBooleans( - int total, - LongColumnVector c, - int rowId) throws IOException { - int left = total; - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0; - c.isNull[rowId] = false; - c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - private void readLongs( - int total, - LongColumnVector c, - int rowId) throws IOException { - int left = total; - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readLong(); - c.isNull[rowId] = false; - c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - private void readFloats( - int total, - DoubleColumnVector c, - int rowId) throws IOException { - int left = total; - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readFloat(); - c.isNull[rowId] = false; - c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - private void readDecimal( - int total, - DecimalColumnVector c, - int rowId) throws IOException { - int left = total; - c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); - c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale); - c.isNull[rowId] = false; - c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - private void readBinaries( - int total, - BytesColumnVector c, - int rowId) throws IOException { - int left = total; - while (left > 0) { - readRepetitionAndDefinitionLevels(); - if (definitionLevel >= maxDefLevel) { - c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe()); - c.isNull[rowId] = false; - // TODO figure out a better way to set repeat for Binary type - c.isRepeating = false; - } else { - c.isNull[rowId] = true; - c.isRepeating = false; - c.noNulls = false; - } - rowId++; - left--; - } - } - - /** - * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. - */ - private void decodeDictionaryIds(int rowId, int num, ColumnVector column, - LongColumnVector dictionaryIds) { - System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num); - if (column.noNulls) { - column.noNulls = dictionaryIds.noNulls; - } - column.isRepeating = column.isRepeating && dictionaryIds.isRepeating; - - switch (descriptor.getType()) { - case INT32: - for (int i = rowId; i < rowId + num; ++i) { - ((LongColumnVector) column).vector[i] = - dictionary.decodeToInt((int) dictionaryIds.vector[i]); - } - break; - case INT64: - for (int i = rowId; i < rowId + num; ++i) { - ((LongColumnVector) column).vector[i] = - dictionary.decodeToLong((int) dictionaryIds.vector[i]); - } - break; - case FLOAT: - for (int i = rowId; i < rowId + num; ++i) { - ((DoubleColumnVector) column).vector[i] = - dictionary.decodeToFloat((int) dictionaryIds.vector[i]); - } - break; - case DOUBLE: - for (int i = rowId; i < rowId + num; ++i) { - ((DoubleColumnVector) column).vector[i] = - dictionary.decodeToDouble((int) dictionaryIds.vector[i]); - } - break; - case INT96: - for (int i = rowId; i < rowId + num; ++i) { - ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer(); - buf.order(ByteOrder.LITTLE_ENDIAN); - long timeOfDayNanos = buf.getLong(); - int julianDay = buf.getInt(); - NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); - Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); - ((TimestampColumnVector) column).set(i, ts); - } - break; - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - for (int i = rowId; i < rowId + num; ++i) { - ((BytesColumnVector) column) - .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); - } - break; - default: - throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); - } - } - - private void readRepetitionAndDefinitionLevels() { - repetitionLevel = repetitionLevelColumn.nextInt(); - definitionLevel = definitionLevelColumn.nextInt(); - valuesRead++; - } - - private void readPage() throws IOException { - DataPage page = pageReader.readPage(); - // TODO: Why is this a visitor? - page.accept(new DataPage.Visitor() { - @Override - public Void visit(DataPageV1 dataPageV1) { - readPageV1(dataPageV1); - return null; - } - - @Override - public Void visit(DataPageV2 dataPageV2) { - readPageV2(dataPageV2); - return null; - } - }); - } - - private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { - this.pageValueCount = valueCount; - this.endOfPageValueCount = valuesRead + pageValueCount; - if (dataEncoding.usesDictionary()) { - this.dataColumn = null; - if (dictionary == null) { - throw new IOException( - "could not read page in col " + descriptor + - " as the dictionary was missing for encoding " + dataEncoding); - } - dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); - this.isCurrentPageDictionaryEncoded = true; - } else { - if (dataEncoding != Encoding.PLAIN) { - throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); - } - dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); - this.isCurrentPageDictionaryEncoded = false; - } - - try { - dataColumn.initFromPage(pageValueCount, bytes, offset); - } catch (IOException e) { - throw new IOException("could not read page in col " + descriptor, e); - } - } - - private void readPageV1(DataPageV1 page) { - ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); - ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); - this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); - this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); - try { - byte[] bytes = page.getBytes().toByteArray(); - LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); - LOG.debug("reading repetition levels at 0"); - rlReader.initFromPage(pageValueCount, bytes, 0); - int next = rlReader.getNextOffset(); - LOG.debug("reading definition levels at " + next); - dlReader.initFromPage(pageValueCount, bytes, next); - next = dlReader.getNextOffset(); - LOG.debug("reading data at " + next); - initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); - } catch (IOException e) { - throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); - } - } - - private void readPageV2(DataPageV2 page) { - this.pageValueCount = page.getValueCount(); - this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), - page.getRepetitionLevels()); - this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); - try { - LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); - initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); - } catch (IOException e) { - throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); - } - } - - private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { - try { - if (maxLevel == 0) { - return new NullIntIterator(); - } - return new RLEIntIterator( - new RunLengthBitPackingHybridDecoder( - BytesUtils.getWidthFromMaxInt(maxLevel), - new ByteArrayInputStream(bytes.toByteArray()))); - } catch (IOException e) { - throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e); - } - } - - /** - * Utility classes to abstract over different way to read ints with different encodings. - * TODO: remove this layer of abstraction? - */ - abstract static class IntIterator { - abstract int nextInt(); - } - - protected static final class ValuesReaderIntIterator extends IntIterator { - ValuesReader delegate; - - public ValuesReaderIntIterator(ValuesReader delegate) { - this.delegate = delegate; - } - - @Override - int nextInt() { - return delegate.readInteger(); - } - } - - protected static final class RLEIntIterator extends IntIterator { - RunLengthBitPackingHybridDecoder delegate; - - public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { - this.delegate = delegate; - } - - @Override - int nextInt() { - try { - return delegate.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } - } - } - - protected static final class NullIntIterator extends IntIterator { - @Override - int nextInt() { return 0; } - } + TypeInfo columnType) throws IOException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index f94c49a..699de59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -23,11 +23,13 @@ import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; @@ -35,6 +37,7 @@ import org.apache.parquet.hadoop.ParquetInputSplit; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.slf4j.Logger; @@ -68,6 +71,7 @@ private List columnNamesList; private List columnTypesList; private VectorizedRowBatchCtx rbCtx; + private List indexColumnsWanted; /** * For each request column, the reader to read this column. This is NULL if this column @@ -198,7 +202,7 @@ public void initialize( columnTypesList); } - List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { requestedSchema = DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); @@ -279,11 +283,81 @@ private void checkEndOfRowGroup() throws IOException { List columns = requestedSchema.getColumns(); List types = requestedSchema.getFields(); columnReaders = new VectorizedColumnReader[columns.size()]; - for (int i = 0; i < columns.size(); ++i) { - columnReaders[i] = - new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)), - skipTimestampConversion, types.get(i)); + + if (!ColumnProjectionUtils.isReadAllColumns(jobConf) && !indexColumnsWanted.isEmpty()) { + for (int i = 0; i < types.size(); ++i) { + columnReaders[i] = + buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i), + pages, requestedSchema.getColumns(), skipTimestampConversion, 0); + } + } else { + for (int i = 0; i < types.size(); ++i) { + columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages, + requestedSchema.getColumns(), skipTimestampConversion, 0); + } } + totalCountLoadedSoFar += pages.getRowCount(); } + + private List getAllColumnDescriptorByType( + int depth, + Type type, + List columns) throws ParquetRuntimeException { + List res = new ArrayList<>(); + for (ColumnDescriptor descriptor : columns) { + if (depth >= descriptor.getPath().length) { + throw new InvalidSchemaException("Corrupted Parquet schema"); + } + if (type.getName().equals(descriptor.getPath()[depth])) { + res.add(descriptor); + } + } + return res; + } + + // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema + private VectorizedColumnReader buildVectorizedParquetReader( + TypeInfo typeInfo, + Type type, + PageReadStore pages, + List columnDescriptors, + boolean skipTimestampConversion, + int depth) throws IOException { + List descriptors = + getAllColumnDescriptorByType(depth, type, columnDescriptors); + switch (typeInfo.getCategory()) { + case PRIMITIVE: + if (columnDescriptors == null || columnDescriptors.isEmpty()) { + throw new RuntimeException( + "Failed to find related Parquet column descriptor with type " + type); + } else { + return new VectorizedPrimitiveColumnReader(descriptors.get(0), + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); + } + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List fieldReaders = new ArrayList<>(); + List fieldTypes = structTypeInfo.getAllStructFieldTypeInfos(); + List types = type.asGroupType().getFields(); + for (int i = 0; i < fieldTypes.size(); i++) { + VectorizedColumnReader r = + buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors, + skipTimestampConversion, depth + 1); + if (r != null) { + fieldReaders.add(r); + } else { + throw new RuntimeException( + "Fail to build Parquet vectorized reader based on Hive type " + fieldTypes.get(i) + .getTypeName() + " and Parquet type" + types.get(i).toString()); + } + } + return new VectorizedStructColumnReader(fieldReaders); + case LIST: + case MAP: + case UNION: + default: + throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name()); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java new file mode 100644 index 0000000..3d5c6e6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -0,0 +1,589 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Timestamp; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * It's column level Parquet reader which is used to read a batch of records for a column, + * part of the code is referred from Apache Spark and Apache Parquet. + */ +public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader { + + private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class); + + private boolean skipTimestampConversion = false; + + /** + * Total number of values read. + */ + private long valuesRead; + + /** + * value that indicates the end of the current page. That is, + * if valuesRead == endOfPageValueCount, we are at the end of the page. + */ + private long endOfPageValueCount; + + /** + * The dictionary, if this column has dictionary encoding. + */ + private final Dictionary dictionary; + + /** + * If true, the current page is dictionary encoded. + */ + private boolean isCurrentPageDictionaryEncoded; + + /** + * Maximum definition level for this column. + */ + private final int maxDefLevel; + + private int definitionLevel; + private int repetitionLevel; + + /** + * Repetition/Definition/Value readers. + */ + private IntIterator repetitionLevelColumn; + private IntIterator definitionLevelColumn; + private ValuesReader dataColumn; + + /** + * Total values in the current page. + */ + private int pageValueCount; + + private final PageReader pageReader; + private final ColumnDescriptor descriptor; + private final Type type; + + public VectorizedPrimitiveColumnReader( + ColumnDescriptor descriptor, + PageReader pageReader, + boolean skipTimestampConversion, + Type type) throws IOException { + this.descriptor = descriptor; + this.type = type; + this.pageReader = pageReader; + this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + this.skipTimestampConversion = skipTimestampConversion; + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + try { + this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); + this.isCurrentPageDictionaryEncoded = true; + } catch (IOException e) { + throw new IOException("could not decode the dictionary for " + descriptor, e); + } + } else { + this.dictionary = null; + this.isCurrentPageDictionaryEncoded = false; + } + } + + public void readBatch( + int total, + ColumnVector column, + TypeInfo columnType) throws IOException { + int rowId = 0; + while (total > 0) { + // Compute the number of values we want to read in this page. + int leftInPage = (int) (endOfPageValueCount - valuesRead); + if (leftInPage == 0) { + readPage(); + leftInPage = (int) (endOfPageValueCount - valuesRead); + } + + int num = Math.min(total, leftInPage); + if (isCurrentPageDictionaryEncoded) { + LongColumnVector dictionaryIds = new LongColumnVector(); + // Read and decode dictionary ids. + readDictionaryIDs(num, dictionaryIds, rowId); + decodeDictionaryIds(rowId, num, column, dictionaryIds); + } else { + // assign values in vector + readBatchHelper(num, column, columnType, rowId); + } + rowId += num; + total -= num; + } + } + + private void readBatchHelper( + int num, + ColumnVector column, + TypeInfo columnType, + int rowId) throws IOException { + PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + switch (primitiveColumnType.getPrimitiveCategory()) { + case INT: + case BYTE: + case SHORT: + readIntegers(num, (LongColumnVector) column, rowId); + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + readLongs(num, (LongColumnVector) column, rowId); + break; + case BOOLEAN: + readBooleans(num, (LongColumnVector) column, rowId); + break; + case DOUBLE: + readDoubles(num, (DoubleColumnVector) column, rowId); + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + readBinaries(num, (BytesColumnVector) column, rowId); + break; + case FLOAT: + readFloats(num, (DoubleColumnVector) column, rowId); + break; + case DECIMAL: + readDecimal(num, (DecimalColumnVector) column, rowId); + break; + case INTERVAL_DAY_TIME: + case TIMESTAMP: + default: + throw new IOException("Unsupported type: " + type); + } + } + + private void readDictionaryIDs( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readValueDictionaryId(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readIntegers( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readInteger(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readDoubles( + int total, + DoubleColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readDouble(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readBooleans( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0; + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readLongs( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readLong(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readFloats( + int total, + DoubleColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readFloat(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readDecimal( + int total, + DecimalColumnVector c, + int rowId) throws IOException { + int left = total; + c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); + c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readBinaries( + int total, + BytesColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe()); + c.isNull[rowId] = false; + // TODO figure out a better way to set repeat for Binary type + c.isRepeating = false; + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + /** + * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. + */ + private void decodeDictionaryIds( + int rowId, + int num, + ColumnVector column, + LongColumnVector dictionaryIds) { + System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num); + if (column.noNulls) { + column.noNulls = dictionaryIds.noNulls; + } + column.isRepeating = column.isRepeating && dictionaryIds.isRepeating; + + switch (descriptor.getType()) { + case INT32: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToInt((int) dictionaryIds.vector[i]); + } + break; + case INT64: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToLong((int) dictionaryIds.vector[i]); + } + break; + case FLOAT: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToFloat((int) dictionaryIds.vector[i]); + } + break; + case DOUBLE: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToDouble((int) dictionaryIds.vector[i]); + } + break; + case INT96: + for (int i = rowId; i < rowId + num; ++i) { + ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer(); + buf.order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = buf.getLong(); + int julianDay = buf.getInt(); + NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); + Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + ((TimestampColumnVector) column).set(i, ts); + } + break; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + if (column instanceof BytesColumnVector) { + for (int i = rowId; i < rowId + num; ++i) { + ((BytesColumnVector) column) + .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); + } + } else { + DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column); + decimalColumnVector.precision = + (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); + decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); + for (int i = rowId; i < rowId + num; ++i) { + decimalColumnVector.vector[i] + .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(), + decimalColumnVector.scale); + } + } + break; + default: + throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); + } + } + + private void readRepetitionAndDefinitionLevels() { + repetitionLevel = repetitionLevelColumn.nextInt(); + definitionLevel = definitionLevelColumn.nextInt(); + valuesRead++; + } + + private void readPage() throws IOException { + DataPage page = pageReader.readPage(); + // TODO: Why is this a visitor? + page.accept(new DataPage.Visitor() { + @Override + public Void visit(DataPageV1 dataPageV1) { + readPageV1(dataPageV1); + return null; + } + + @Override + public Void visit(DataPageV2 dataPageV2) { + readPageV2(dataPageV2); + return null; + } + }); + } + + private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { + this.pageValueCount = valueCount; + this.endOfPageValueCount = valuesRead + pageValueCount; + if (dataEncoding.usesDictionary()) { + this.dataColumn = null; + if (dictionary == null) { + throw new IOException( + "could not read page in col " + descriptor + + " as the dictionary was missing for encoding " + dataEncoding); + } + dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); + this.isCurrentPageDictionaryEncoded = true; + } else { + if (dataEncoding != Encoding.PLAIN) { + throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); + } + dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); + this.isCurrentPageDictionaryEncoded = false; + } + + try { + dataColumn.initFromPage(pageValueCount, bytes, offset); + } catch (IOException e) { + throw new IOException("could not read page in col " + descriptor, e); + } + } + + private void readPageV1(DataPageV1 page) { + ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); + ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); + this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); + this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); + try { + byte[] bytes = page.getBytes().toByteArray(); + LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); + LOG.debug("reading repetition levels at 0"); + rlReader.initFromPage(pageValueCount, bytes, 0); + int next = rlReader.getNextOffset(); + LOG.debug("reading definition levels at " + next); + dlReader.initFromPage(pageValueCount, bytes, next); + next = dlReader.getNextOffset(); + LOG.debug("reading data at " + next); + initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private void readPageV2(DataPageV2 page) { + this.pageValueCount = page.getValueCount(); + this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), + page.getRepetitionLevels()); + this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); + try { + LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); + initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { + try { + if (maxLevel == 0) { + return new NullIntIterator(); + } + return new RLEIntIterator( + new RunLengthBitPackingHybridDecoder( + BytesUtils.getWidthFromMaxInt(maxLevel), + new ByteArrayInputStream(bytes.toByteArray()))); + } catch (IOException e) { + throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e); + } + } + + /** + * Utility classes to abstract over different way to read ints with different encodings. + * TODO: remove this layer of abstraction? + */ + abstract static class IntIterator { + abstract int nextInt(); + } + + protected static final class ValuesReaderIntIterator extends IntIterator { + ValuesReader delegate; + + public ValuesReaderIntIterator(ValuesReader delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + return delegate.readInteger(); + } + } + + protected static final class RLEIntIterator extends IntIterator { + RunLengthBitPackingHybridDecoder delegate; + + public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + try { + return delegate.readInt(); + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + } + + protected static final class NullIntIterator extends IntIterator { + @Override + int nextInt() { return 0; } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java new file mode 100644 index 0000000..cc6cb20 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; +import java.util.List; + +public class VectorizedStructColumnReader implements VectorizedColumnReader { + + private final List fieldReaders; + + public VectorizedStructColumnReader(List fieldReaders) { + this.fieldReaders = fieldReaders; + } + + @Override + public void readBatch( + int total, + ColumnVector column, + TypeInfo columnType) throws IOException { + StructColumnVector structColumnVector = (StructColumnVector) column; + StructTypeInfo structTypeInfo = (StructTypeInfo) columnType; + ColumnVector[] vectors = structColumnVector.fields; + for (int i = 0; i < vectors.length; i++) { + fieldReaders.get(i) + .readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i)); + structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating; + + for (int j = 0; j < vectors[i].isNull.length; j++) { + structColumnVector.isNull[j] = + (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j]; + } + structColumnVector.noNulls = + (i == 0) ? vectors[i].noNulls : structColumnVector.noNulls && vectors[i].noNulls; + } + + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 276ff19..d4b4140 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -1,9 +1,13 @@ /** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,416 +18,74 @@ package org.apache.hadoop.hive.ql.io.parquet; -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; -import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; -import org.apache.parquet.hadoop.ParquetInputFormat; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.GroupReadSupport; -import org.apache.parquet.hadoop.example.GroupWriteSupport; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.MessageType; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; -import java.util.List; -import java.util.Random; -import static junit.framework.Assert.assertTrue; -import static junit.framework.TestCase.assertFalse; -import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; -import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; -import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; -import static org.junit.Assert.assertEquals; - -public class TestVectorizedColumnReader { - - private static final int nElements = 2500; - protected static final Configuration conf = new Configuration(); - protected static final Path file = - new Path("target/test/TestParquetVectorReader/testParquetFile"); - private static String[] uniqueStrs = new String[nElements]; - private static boolean[] isNulls = new boolean[nElements]; - private static Random random = new Random(); - protected static final MessageType schema = parseMessageType( - "message test { " - + "required int32 int32_field; " - + "required int64 int64_field; " - + "required int96 int96_field; " - + "required double double_field; " - + "required float float_field; " - + "required boolean boolean_field; " - + "required fixed_len_byte_array(3) flba_field; " - + "optional fixed_len_byte_array(1) some_null_field; " - + "optional fixed_len_byte_array(1) all_null_field; " - + "optional binary binary_field; " - + "optional binary binary_field_non_repeating; " - + "} "); - - @AfterClass - public static void cleanup() throws IOException { - FileSystem fs = file.getFileSystem(conf); - if (fs.exists(file)) { - fs.delete(file, true); - } - } +public class TestVectorizedColumnReader extends TestVectorizedColumnReaderBase { + static boolean isDictionaryEncoding = false; @BeforeClass - public static void prepareFile() throws IOException { - cleanup(); - - boolean dictionaryEnabled = true; - boolean validating = false; - GroupWriteSupport.setSchema(schema, conf); - SimpleGroupFactory f = new SimpleGroupFactory(schema); - ParquetWriter writer = new ParquetWriter( - file, - new GroupWriteSupport(), - GZIP, 1024*1024, 1024, 1024*1024, - dictionaryEnabled, validating, PARQUET_1_0, conf); - writeData(f, writer); - } - - protected static void writeData(SimpleGroupFactory f, ParquetWriter writer) throws IOException { - initialStrings(uniqueStrs); - for (int i = 0; i < nElements; i++) { - Group group = f.newGroup() - .append("int32_field", i) - .append("int64_field", (long) 2 * i) - .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes())) - .append("double_field", i * 1.0) - .append("float_field", ((float) (i * 2.0))) - .append("boolean_field", i % 5 == 0) - .append("flba_field", "abc"); - - if (i % 2 == 1) { - group.append("some_null_field", "x"); - } - - if (i % 13 != 1) { - int binaryLen = i % 10; - group.append("binary_field", - Binary.fromString(new String(new char[binaryLen]).replace("\0", "x"))); - } - - if (uniqueStrs[i] != null) { - group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i])); - } - writer.write(group); - } - writer.close(); + public static void setup() throws IOException { + removeFile(); + writeData(initWriterFromFile(), isDictionaryEncoding); } - private static String getRandomStr() { - int len = random.nextInt(10); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < len; i++) { - sb.append((char) ('a' + random.nextInt(25))); - } - return sb.toString(); - } - - public static void initialStrings(String[] uniqueStrs) { - for (int i = 0; i < uniqueStrs.length; i++) { - String str = getRandomStr(); - if (!str.isEmpty()) { - uniqueStrs[i] = str; - isNulls[i] = false; - }else{ - isNulls[i] = true; - } - } + @AfterClass + public static void cleanup() throws IOException { + removeFile(); } - private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) - throws IOException, InterruptedException, HiveException { - conf.set(PARQUET_READ_SCHEMA, schemaString); - HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); - - Job vectorJob = new Job(conf, "read vector"); - ParquetInputFormat.setInputPaths(vectorJob, file); - ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); - InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); - initialVectorizedRowBatchCtx(conf); - return new VectorizedParquetRecordReader(split, new JobConf(conf)); + @Test + public void testIntRead() throws Exception { + intRead(isDictionaryEncoding); } - private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { - MapWork mapWork = new MapWork(); - VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(createStructObjectInspector(conf), new String[0]); - mapWork.setVectorMode(true); - mapWork.setVectorizedRowBatchCtx(rbCtx); - Utilities.setMapWork(conf, mapWork); + @Test + public void testLongRead() throws Exception { + longRead(isDictionaryEncoding); } - private StructObjectInspector createStructObjectInspector(Configuration conf) { - // Create row related objects - String columnNames = conf.get(IOConstants.COLUMNS); - List columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); - String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); - List columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); - TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); - return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + @Test + public void testDoubleRead() throws Exception { + doubleRead(isDictionaryEncoding); } @Test - public void testIntRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"int32_field"); - conf.set(IOConstants.COLUMNS_TYPES,"int"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required int32 int32_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - LongColumnVector vector = (LongColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals(c, vector.vector[i]); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + public void testFloatRead() throws Exception { + floatRead(isDictionaryEncoding); } @Test - public void testLongRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"int64_field"); - conf.set(IOConstants.COLUMNS_TYPES, "bigint"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required int64 int64_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - LongColumnVector vector = (LongColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals(2 * c, vector.vector[i]); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + public void testBooleanRead() throws Exception { + booleanRead(); } @Test - public void testDoubleRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"double_field"); - conf.set(IOConstants.COLUMNS_TYPES, "double"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required double double_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals(1.0 * c, vector.vector[i], 0); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + public void testBinaryRead() throws Exception { + binaryRead(isDictionaryEncoding); } @Test - public void testFloatRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"float_field"); - conf.set(IOConstants.COLUMNS_TYPES, "float"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required float float_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals((float)2.0 * c, vector.vector[i], 0); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + public void testStructRead() throws Exception { + structRead(isDictionaryEncoding); } @Test - public void testBooleanRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"boolean_field"); - conf.set(IOConstants.COLUMNS_TYPES, "boolean"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required boolean boolean_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - LongColumnVector vector = (LongColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - int e = (c % 5 == 0) ? 1 : 0; - assertEquals(e, vector.vector[i]); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + public void testNestedStructRead() throws Exception { + nestedStructRead0(isDictionaryEncoding); + nestedStructRead1(isDictionaryEncoding); } @Test - public void testBinaryReadDictionaryEncoding() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"binary_field"); - conf.set(IOConstants.COLUMNS_TYPES, "string"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required binary binary_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - int c = 0; - try { - while (reader.next(NullWritable.get(), previous)) { - BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; - boolean noNull = true; - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - if (c % 13 == 1) { - assertTrue(vector.isNull[i]); - } else { - assertFalse(vector.isNull[i]); - int binaryLen = c % 10; - String expected = new String(new char[binaryLen]).replace("\0", "x"); - String actual = new String(ArrayUtils - .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); - assertEquals("Failed at " + c, expected, actual); - noNull = false; - } - c++; - } - assertEquals("No Null check failed at " + c, noNull, vector.noNulls); - assertFalse(vector.isRepeating); - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + public void structReadSomeNull() throws Exception { + structReadSomeNull(isDictionaryEncoding); } @Test - public void testBinaryRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"binary_field_non_repeating"); - conf.set(IOConstants.COLUMNS_TYPES, "string"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required binary binary_field_non_repeating;}", conf); - VectorizedRowBatch previous = reader.createValue(); - int c = 0; - try { - while (reader.next(NullWritable.get(), previous)) { - BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; - boolean noNull = true; - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - String actual; - assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]); - if (!vector.isNull[i]) { - actual = new String(ArrayUtils - .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); - assertEquals("failed at " + c, uniqueStrs[c], actual); - }else{ - noNull = false; - } - c++; - } - assertEquals("No Null check failed at " + c, noNull, vector.noNulls); - assertFalse(vector.isRepeating); - } - assertEquals("It doesn't exit at expected position", nElements, c); - } finally { - reader.close(); - } + public void decimalRead() throws Exception { + decimalRead(isDictionaryEncoding); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java new file mode 100644 index 0000000..eecccce --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java @@ -0,0 +1,694 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertFalse; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; + +public class TestVectorizedColumnReaderBase { + + protected final static int nElements = 2500; + protected final static int UNIQUE_NUM = 10; + protected final static int NULL_FREQUENCY = 13; + + protected final static Configuration conf = new Configuration(); + protected final static Path file = new Path("target/test/TestParquetVectorReader/testParquetFile"); + + protected static final MessageType schema = parseMessageType( + "message hive_schema { " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required int96 int96_field; " + + "required double double_field; " + + "required float float_field; " + + "required boolean boolean_field; " + + "required fixed_len_byte_array(3) flba_field; " + + "optional fixed_len_byte_array(1) some_null_field; " + + "optional fixed_len_byte_array(1) all_null_field; " + + "required binary binary_field; " + + "optional binary binary_field_some_null; " + + "required binary value (DECIMAL(5,2)); " + + "required group struct_field {" + + " required int32 a;\n" + + " required double b;\n" + + "}\n" + + "optional group nested_struct_field {" + + " optional group nsf {" + + " optional int32 c;\n" + + " optional int32 d;\n" + + " }\n" + + " optional double e;\n" + + "}\n" + + "optional group struct_field_some_null {" + + " optional int32 f;\n" + + " optional double g;\n" + + "}\n" + + "optional group map_field (MAP) {\n" + + " repeated group map (MAP_KEY_VALUE) {\n" + + " required binary key;\n" + + " optional binary value;\n" + + " }\n" + + "}\n" + + "optional group array_list (LIST) {\n" + + " repeated group bag {\n" + + " optional int32 array_element;\n" + + " }\n" + + "}\n" + + "} "); + + protected static void removeFile() throws IOException { + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + fs.delete(file, true); + } + } + + protected static ParquetWriter initWriterFromFile() throws IOException { + GroupWriteSupport.setSchema(schema, conf); + return new ParquetWriter<>( + file, + new GroupWriteSupport(), + GZIP, 1024 * 1024, 1024, 1024 * 1024, + true, false, PARQUET_1_0, conf); + } + + protected static int getIntValue( + boolean isDictionaryEncoding, + int index) { + return isDictionaryEncoding ? index % UNIQUE_NUM : index; + } + + protected static double getDoubleValue( + boolean isDictionaryEncoding, + int index) { + return isDictionaryEncoding ? index % UNIQUE_NUM : index; + } + + protected static long getLongValue( + boolean isDictionaryEncoding, + int index) { + return isDictionaryEncoding ? (long) 2 * index % UNIQUE_NUM : (long) 2 * index; + } + + protected static float getFloatValue( + boolean isDictionaryEncoding, + int index) { + return (float) (isDictionaryEncoding ? index % UNIQUE_NUM * 2.0 : index * 2.0); + } + + protected static boolean getBooleanValue( + float index) { + return (index % 2 == 0); + } + + protected static String getTimestampStr(int index) { + String s = String.valueOf(index); + int l = 4 - s.length(); + for (int i = 0; i < l; i++) { + s = "0" + s; + } + return "99999999" + s; + } + + protected static HiveDecimal getDecimal( + boolean isDictionaryEncoding, + int index) { + int decimalVal = index % 100; + String decimalStr = (decimalVal < 10) ? "0" + String.valueOf(decimalVal) : String.valueOf + (decimalVal); + int intVal = (isDictionaryEncoding) ? index % UNIQUE_NUM : index / 100; + String d = String.valueOf(intVal) + decimalStr; + BigInteger bi = new BigInteger(d); + BigDecimal bd = new BigDecimal(bi); + return HiveDecimal.create(bd); + } + + protected static Binary getTimestamp( + boolean isDictionaryEncoding, + int index) { + String s = isDictionaryEncoding ? getTimestampStr(index % UNIQUE_NUM) : getTimestampStr(index); + return Binary.fromReusedByteArray(s.getBytes()); + } + + protected static String getStr( + boolean isDictionaryEncoding, + int index) { + int binaryLen = isDictionaryEncoding ? index % UNIQUE_NUM : index; + String v = ""; + while (binaryLen > 0) { + char t = (char) ('a' + binaryLen % 26); + binaryLen /= 26; + v = t + v; + } + return v; + } + + protected static Binary getBinaryValue( + boolean isDictionaryEncoding, + int index) { + return Binary.fromString(getStr(isDictionaryEncoding, index)); + } + + protected static boolean isNull(int index) { + return (index % NULL_FREQUENCY == 0); + } + + protected VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) + throws IOException, InterruptedException, HiveException { + conf.set(PARQUET_READ_SCHEMA, schemaString); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); + + Job vectorJob = new Job(conf, "read vector"); + ParquetInputFormat.setInputPaths(vectorJob, file); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); + InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + initialVectorizedRowBatchCtx(conf); + return new VectorizedParquetRecordReader(split, new JobConf(conf)); + } + + protected static void writeData(ParquetWriter writer, boolean isDictionaryEncoding) throws IOException { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + for (int i = 0; i < nElements; i++) { + boolean isNull = isNull(i); + int intVal = getIntValue(isDictionaryEncoding, i); + long longVal = getLongValue(isDictionaryEncoding, i); + Binary timeStamp = getTimestamp(isDictionaryEncoding, i); + HiveDecimal decimalVal = getDecimal(isDictionaryEncoding, i).setScale(2); + double doubleVal = getDoubleValue(isDictionaryEncoding, i); + float floatVal = getFloatValue(isDictionaryEncoding, i); + boolean booleanVal = getBooleanValue(i); + Binary binary = getBinaryValue(isDictionaryEncoding, i); + Group group = f.newGroup() + .append("int32_field", intVal) + .append("int64_field", longVal) + .append("int96_field", timeStamp) + .append("double_field", doubleVal) + .append("float_field", floatVal) + .append("boolean_field", booleanVal) + .append("flba_field", "abc"); + + if (!isNull) { + group.append("some_null_field", "x"); + } + + group.append("binary_field", binary); + + if (!isNull) { + group.append("binary_field_some_null", binary); + } + + HiveDecimalWritable w = new HiveDecimalWritable(decimalVal); + group.append("value", Binary.fromConstantByteArray(w.getInternalStorage())); + + group.addGroup("struct_field") + .append("a", intVal) + .append("b", doubleVal); + + Group g = group.addGroup("nested_struct_field"); + + g.addGroup("nsf").append("c", intVal).append("d", intVal); + g.append("e", doubleVal); + + Group some_null_g = group.addGroup("struct_field_some_null"); + if (i % 2 != 0) { + some_null_g.append("f", intVal); + } + if (i % 3 != 0) { + some_null_g.append("g", doubleVal); + } + + Group mapGroup = group.addGroup("map_field"); + if (i % 13 != 1) { + mapGroup.addGroup("map").append("key", binary).append("value", "abc"); + } else { + mapGroup.addGroup("map").append("key", binary); + } + + Group arrayGroup = group.addGroup("array_list"); + for (int j = 0; j < i % 4; j++) { + arrayGroup.addGroup("bag").append("array_element", intVal); + } + + writer.write(group); + } + writer.close(); + } + + private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { + MapWork mapWork = new MapWork(); + VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(createStructObjectInspector(conf), new String[0]); + mapWork.setVectorMode(true); + mapWork.setVectorizedRowBatchCtx(rbCtx); + Utilities.setMapWork(conf, mapWork); + } + + private StructObjectInspector createStructObjectInspector(Configuration conf) { + // Create row related objects + String columnNames = conf.get(IOConstants.COLUMNS); + List columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); + List columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + } + + protected void intRead(boolean isDictionaryEncoding) throws InterruptedException, HiveException, IOException { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int32_field"); + conf.set(IOConstants.COLUMNS_TYPES,"int"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int32 int32_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals("Failed at " + c, getIntValue(isDictionaryEncoding, c), vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void longRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "int64_field"); + conf.set(IOConstants.COLUMNS_TYPES, "bigint"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int64 int64_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, getLongValue(isDictionaryEncoding, c), vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void doubleRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "double_field"); + conf.set(IOConstants.COLUMNS_TYPES, "double"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required double double_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, getDoubleValue(isDictionaryEncoding, c), vector.vector[i], + 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void floatRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "float_field"); + conf.set(IOConstants.COLUMNS_TYPES, "float"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required float float_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, getFloatValue(isDictionaryEncoding, c), vector.vector[i], + 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void booleanRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "boolean_field"); + conf.set(IOConstants.COLUMNS_TYPES, "boolean"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required boolean boolean_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, (getBooleanValue(c) ? 1 : 0), vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void binaryRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "binary_field_some_null"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field_some_null;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + String actual; + assertEquals("Null assert failed at " + c, isNull(c), vector.isNull[i]); + if (!vector.isNull[i]) { + actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("failed at " + c, getStr(isDictionaryEncoding, c), actual); + } else { + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + protected void structRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "struct_field"); + conf.set(IOConstants.COLUMNS_TYPES, "struct"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + "group struct_field {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + "}\n" + + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector vector = (StructColumnVector) previous.cols[0]; + LongColumnVector cv = (LongColumnVector) vector.fields[0]; + DoubleColumnVector dv = (DoubleColumnVector) vector.fields[1]; + + for (int i = 0; i < cv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]); + assertEquals(getDoubleValue(isDictionaryEncoding, c), dv.vector[i], 0); + assertFalse(vector.isNull[i]); + assertFalse(vector.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + protected void nestedStructRead0(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "nested_struct_field"); + conf.set(IOConstants.COLUMNS_TYPES, "struct,e:double>"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + "group nested_struct_field {\n" + + " optional group nsf {\n" + + " optional int32 c;\n" + + " optional int32 d;\n" + + " }" + + "optional double e;\n" + + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector vector = (StructColumnVector) previous.cols[0]; + StructColumnVector sv = (StructColumnVector) vector.fields[0]; + LongColumnVector cv = (LongColumnVector) sv.fields[0]; + LongColumnVector dv = (LongColumnVector) sv.fields[1]; + DoubleColumnVector ev = (DoubleColumnVector) vector.fields[1]; + + for (int i = 0; i < cv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]); + assertEquals(getIntValue(isDictionaryEncoding, c), dv.vector[i]); + assertEquals(getDoubleValue(isDictionaryEncoding, c), ev.vector[i], 0); + assertFalse(vector.isNull[i]); + assertFalse(vector.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + protected void nestedStructRead1(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "nested_struct_field"); + conf.set(IOConstants.COLUMNS_TYPES, "struct>"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + "group nested_struct_field {\n" + + " optional group nsf {\n" + + " optional int32 c;\n" + + " }" + + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector vector = (StructColumnVector) previous.cols[0]; + StructColumnVector sv = (StructColumnVector) vector.fields[0]; + LongColumnVector cv = (LongColumnVector) sv.fields[0]; + + for (int i = 0; i < cv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]); + assertFalse(vector.isNull[i]); + assertFalse(vector.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + protected void structReadSomeNull(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "struct_field_some_null"); + conf.set(IOConstants.COLUMNS_TYPES, "struct"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + "group struct_field_some_null {\n" + + " optional int32 f;\n" + + " optional double g;\n" + + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector sv = (StructColumnVector) previous.cols[0]; + LongColumnVector fv = (LongColumnVector) sv.fields[0]; + DoubleColumnVector gv = (DoubleColumnVector) sv.fields[1]; + + for (int i = 0; i < fv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(c % 2 == 0, fv.isNull[i]); + assertEquals(c % 3 == 0, gv.isNull[i]); + assertEquals(c % /* 2*3 = */6 == 0, sv.isNull[i]); + if (!sv.isNull[i]) { + if (!fv.isNull[i]) { + assertEquals(getIntValue(isDictionaryEncoding, c), fv.vector[i]); + } + if (!gv.isNull[i]) { + assertEquals(getDoubleValue(isDictionaryEncoding, c), gv.vector[i], 0); + } + } + assertFalse(fv.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + protected void decimalRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "value"); + conf.set(IOConstants.COLUMNS_TYPES, "decimal(5,2)"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message hive_schema { required value (DECIMAL(5,2));}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DecimalColumnVector vector = (DecimalColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Check failed at pos " + c, getDecimal(isDictionaryEncoding, c), + vector.vector[i].getHiveDecimal()); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java new file mode 100644 index 0000000..c6677cc --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +public class TestVectorizedDictionaryEncodingColumnReader extends TestVectorizedColumnReaderBase { + static boolean isDictionaryEncoding = true; + + @BeforeClass + public static void setup() throws IOException { + removeFile(); + writeData(initWriterFromFile(), isDictionaryEncoding); + } + + @AfterClass + public static void cleanup() throws IOException { + removeFile(); + } + + @Test + public void testIntRead() throws Exception { + intRead(isDictionaryEncoding); + } + + @Test + public void testLongRead() throws Exception { + longRead(isDictionaryEncoding); + } + + @Test + public void testDoubleRead() throws Exception { + doubleRead(isDictionaryEncoding); + } + + @Test + public void testFloatRead() throws Exception { + floatRead(isDictionaryEncoding); + } + + @Test + public void testBinaryRead() throws Exception { + binaryRead(isDictionaryEncoding); + } + + @Test + public void testStructRead() throws Exception { + structRead(isDictionaryEncoding); + } + + @Test + public void testNestedStructRead() throws Exception { + structRead(isDictionaryEncoding); + } + + @Test + public void structReadSomeNull() throws Exception { + structReadSomeNull(isDictionaryEncoding); + } + + @Test + public void decimalRead() throws Exception { + decimalRead(isDictionaryEncoding); + } +}