diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java index 907a9b8..9465720 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java @@ -14,10 +14,10 @@ package org.apache.hadoop.hive.ql.io.parquet.vector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.page.DataPage; import org.apache.parquet.column.page.DataPageV1; @@ -62,7 +62,7 @@ /** * The dictionary, if this column has dictionary encoding. */ - protected final Dictionary dictionary; + protected final ParquetDictionaryDataColumnReaderFactory.DictionaryParquetDataColumnReader dictionary; /** * If true, the current page is dictionary encoded. @@ -82,7 +82,7 @@ */ protected IntIterator repetitionLevelColumn; protected IntIterator definitionLevelColumn; - protected ValuesReader dataColumn; + protected ParquetDataColumnReader dataColumn; /** * Total values in the current page. @@ -92,22 +92,38 @@ protected final PageReader pageReader; protected final ColumnDescriptor descriptor; protected final Type type; + protected final TypeInfo hiveType; + + /** + * Used for VectorizedDummyColumnReader. + */ + public BaseVectorizedColumnReader(){ + this.pageReader = null; + this.descriptor = null; + this.type = null; + this.dictionary = null; + this.hiveType = null; + this.maxDefLevel = -1; + } public BaseVectorizedColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, - Type type) throws IOException { + Type parquetType, TypeInfo hiveType) throws IOException { this.descriptor = descriptor; - this.type = type; + this.type = parquetType; this.pageReader = pageReader; this.maxDefLevel = descriptor.getMaxDefinitionLevel(); this.skipTimestampConversion = skipTimestampConversion; + this.hiveType = hiveType; DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); if (dictionaryPage != null) { try { - this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); + this.dictionary = ParquetDictionaryDataColumnReaderFactory.getDataColumnReaderByType + (parquetType.asPrimitiveType().getPrimitiveTypeName(), dictionaryPage.getEncoding() + .initDictionary(descriptor, dictionaryPage)); this.isCurrentPageDictionaryEncoded = true; } catch (IOException e) { throw new IOException("could not decode the dictionary for " + descriptor, e); @@ -146,20 +162,24 @@ public Void visit(DataPageV2 dataPageV2) { }); } - private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { + private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) + throws IOException { this.pageValueCount = valueCount; this.endOfPageValueCount = valuesRead + pageValueCount; if (dataEncoding.usesDictionary()) { this.dataColumn = null; - if (dictionary == null) { + if (dictionary.realReader == null) { throw new IOException( "could not read page in col " + descriptor + " as the dictionary was missing for encoding " + dataEncoding); } - dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); + dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(descriptor.getType(), + hiveType, dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary + .realReader), skipTimestampConversion); this.isCurrentPageDictionaryEncoded = true; } else { - dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); + dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(descriptor.getType(), + hiveType, dataEncoding.getValuesReader(descriptor, VALUES), skipTimestampConversion); this.isCurrentPageDictionaryEncoded = false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java new file mode 100644 index 0000000..f0289fb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import java.io.IOException; +import java.sql.Timestamp; + +public interface ParquetDataColumnReader { + void initFromPage(int valueCount, byte[] page, int offset) throws IOException; + + int readValueDictionaryId(); + + long readLong(); + + long readBoolean(); + + byte[] readBytes(); + + double readDouble(); + + Timestamp readTimestamp(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java new file mode 100644 index 0000000..a3065a0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.Timestamp; + +/** + * Parquet file has self-describing schema which may differ from the user required schema (e.g. + * schema evolution). This factory is used to retrieve user required typed data via corresponding + * reader which reads the underlying data. + */ +public final class ParquetDataColumnReaderFactory { + + private ParquetDataColumnReaderFactory() { + } + + /** + * The default data column reader for existing Parquet page reader which works for both + * dictionary or non dictionary types. + */ + public static class DefaultParquetDataColumnReader implements ParquetDataColumnReader { + protected ValuesReader realReader; + protected boolean skipTimestampConversion; + + public DefaultParquetDataColumnReader(ValuesReader realReader) { + this.realReader = realReader; + this.skipTimestampConversion = false; + } + + public DefaultParquetDataColumnReader(ValuesReader realReader, boolean skipTimestampConversion) { + this.realReader = realReader; + this.skipTimestampConversion = skipTimestampConversion; + } + + public void initFromPage(int i, ByteBuffer byteBuffer, int i1) throws IOException { + realReader.initFromPage(i, byteBuffer, i1); + } + + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException { + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + + /** + * @return the next boolean from the page + */ + public long readBoolean() { + return realReader.readBoolean() ? 1 : 0; + } + + /** + * @return the next Binary from the page + */ + public byte[] readBytes() { + return realReader.readBytes().getBytesUnsafe(); + } + + /** + * @return the next float from the page + */ + public float readFloat() { + return realReader.readFloat(); + } + + /** + * @return the next double from the page + */ + public double readDouble() { + return realReader.readDouble(); + } + + @Override + public Timestamp readTimestamp() { + NanoTime nt = NanoTime.fromBinary(realReader.readBytes()); + return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + } + + /** + * @return the next integer from the page + */ + public int readInteger() { + return realReader.readInteger(); + } + + /** + * @return the next long from the page + */ + public long readLong() { + return realReader.readLong(); + } + + public int readValueDictionaryId() { + return realReader.readValueDictionaryId(); + } + + public void skip() { + realReader.skip(); + } + } + + /** + * The reader who reads from the underlying int32 value value. Implementation is in consist with + * ETypeConverter EINT32_CONVERTER + */ + public static class TypesFromInt32PageReader extends DefaultParquetDataColumnReader { + + public TypesFromInt32PageReader(ValuesReader realReader) { + super(realReader); + } + + @Override + public long readLong() { + return realReader.readInteger(); + } + + @Override + public float readFloat() { + return realReader.readInteger(); + } + + @Override + public double readDouble() { + return realReader.readInteger(); + } + } + + /** + * The reader who reads from the underlying int64 value value. Implementation is in consist with + * ETypeConverter EINT64_CONVERTER + */ + public static class TypesFromInt64PageReader extends DefaultParquetDataColumnReader { + + public TypesFromInt64PageReader(ValuesReader realReader) { + super(realReader); + } + + @Override + public float readFloat() { + return realReader.readLong(); + } + + @Override + public double readDouble() { + return realReader.readLong(); + } + } + + /** + * The reader who reads from the underlying float value value. Implementation is in consist with + * ETypeConverter EFLOAT_CONVERTER + */ + public static class TypesFromFloatPageReader extends DefaultParquetDataColumnReader { + + public TypesFromFloatPageReader(ValuesReader realReader) { + super(realReader); + } + + @Override + public double readDouble() { + return realReader.readFloat(); + } + } + + public static class StringFromInt96PageReader extends DefaultParquetDataColumnReader { + + public StringFromInt96PageReader(ValuesReader realReader, boolean skipTimestampConversion) { + super(realReader, skipTimestampConversion); + } + + @Override + public byte[] readBytes() { + return new TimestampWritable(readTimestamp()).toString().getBytes(); + } + } + + public static ParquetDataColumnReader getDataColumnReaderByType(PrimitiveType + .PrimitiveTypeName + parquetType, TypeInfo hiveType, + ValuesReader realReader, boolean skipTimestampConversion) + throws IOException { + switch (parquetType) { + case INT32: + return new TypesFromInt32PageReader(realReader); + case INT64: + return new TypesFromInt64PageReader(realReader); + case FLOAT: + return new TypesFromFloatPageReader(realReader); + case INT96: + return getConvertorFromInt96((PrimitiveTypeInfo) hiveType, realReader, skipTimestampConversion); + case BOOLEAN: + case BINARY: + case DOUBLE: + case FIXED_LEN_BYTE_ARRAY: + return new DefaultParquetDataColumnReader(realReader); + default: + throw new IOException("Invalid category " + parquetType); + } + } + + private static ParquetDataColumnReader getConvertorFromInt96(PrimitiveTypeInfo hiveType, + ValuesReader realReader, boolean skipTimestampConversion) { + switch (hiveType.getPrimitiveCategory()){ + case STRING: + case CHAR: + case VARCHAR: + return new StringFromInt96PageReader(realReader, skipTimestampConversion); + case INT: + case BYTE: + case SHORT: + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + case BOOLEAN: + case DOUBLE: + case BINARY: + case FLOAT: + case DECIMAL: + case TIMESTAMP: + case INTERVAL_DAY_TIME: + default: + throw new RuntimeException("Unsupported type conversion from Hive type: " + hiveType + " to " + + "Parquet int96"); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDictionaryDataColumnReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDictionaryDataColumnReaderFactory.java new file mode 100644 index 0000000..f8ab577 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDictionaryDataColumnReaderFactory.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +import java.io.IOException; + +/** + * Parquet file has self-describing schema which may differ from the user required schema (e.g. + * schema evolution). This factory is used to retrieve user required typed data via corresponding + * reader which reads the underlying data. + */ +public final class ParquetDictionaryDataColumnReaderFactory { + + private ParquetDictionaryDataColumnReaderFactory() { + } + + /** + * The default data column reader for existing Parquet page reader which works for both + * dictionary or non dictionary types. + */ + public static class DictionaryParquetDataColumnReader { + protected Dictionary realReader; + + public DictionaryParquetDataColumnReader(Dictionary realReader) { + this.realReader = realReader; + } + + public boolean readBoolean(int id) { + return realReader.decodeToBoolean(id); + } + + public Binary decodeToBinary(int id) { + return realReader.decodeToBinary(id); + } + + public float decodeToFloat(int id) { + return realReader.decodeToFloat(id); + } + + public double decodeToDouble(int id) { + return realReader.decodeToDouble(id); + } + + public int decodeToInt(int id) { + return realReader.decodeToInt(id); + } + + public long decodeToLong(int id) { + return realReader.decodeToLong(id); + } + + public boolean decodeToBoolean(int id){ + return realReader.decodeToBoolean(id); + } + } + + /** + * The reader who reads from the underlying int32 value value. Implementation is in consist with + * ETypeConverter EINT32_CONVERTER + */ + public static class TypesFromInt32DictionaryPageReader extends DictionaryParquetDataColumnReader { + + public TypesFromInt32DictionaryPageReader(Dictionary realReader) { + super(realReader); + } + + @Override + public long decodeToLong(int id) { + return realReader.decodeToInt(id); + } + + @Override + public float decodeToFloat(int id) { + return realReader.decodeToInt(id); + } + + @Override + public double decodeToDouble(int id) { + return realReader.decodeToInt(id); + } + } + + /** + * The reader who reads from the underlying int64 value value. Implementation is in consist with + * ETypeConverter EINT64_CONVERTER + */ + public static class TypesFromInt64DictionaryPageReader extends DictionaryParquetDataColumnReader { + + public TypesFromInt64DictionaryPageReader(Dictionary realReader) { + super(realReader); + } + + public float decodeToFloat(int id) { + return realReader.decodeToLong(id); + } + + public double decodeToDouble(int id) { + return realReader.decodeToLong(id); + } + } + + /** + * The reader who reads from the underlying float value value. Implementation is in consist with + * ETypeConverter EFLOAT_CONVERTER + */ + public static class TypesFromFloatDictionaryPageReader extends DictionaryParquetDataColumnReader { + + public TypesFromFloatDictionaryPageReader(Dictionary realReader) { + super(realReader); + } + + public double decodeToDouble(int id) { + return realReader.decodeToFloat(id); + } + } + + public static DictionaryParquetDataColumnReader getDataColumnReaderByType( + PrimitiveType.PrimitiveTypeName parquetType, Dictionary realReader) throws IOException { + switch (parquetType) { + case INT32: + return new TypesFromInt32DictionaryPageReader(realReader); + case INT64: + return new TypesFromInt64DictionaryPageReader(realReader); + case FLOAT: + return new TypesFromFloatDictionaryPageReader(realReader); + case BOOLEAN: + case BINARY: + case DOUBLE: + case INT96: + case FIXED_LEN_BYTE_ARRAY: + return new DictionaryParquetDataColumnReader(realReader); + default: + throw new IOException("Invalid category " + parquetType); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java new file mode 100644 index 0000000..ee1d692 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A dummy vectorized parquet reader to support schema evolution. + */ +public class VectorizedDummyColumnReader extends BaseVectorizedColumnReader { + + public VectorizedDummyColumnReader() { + super(); + } + + @Override + public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException { + Arrays.fill(column.isNull, true); + column.isRepeating = true; + column.noNulls = false; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java index c36640d..dfe85e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java @@ -46,8 +46,8 @@ boolean isFirstRow = true; public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader pageReader, - boolean skipTimestampConversion, Type type) throws IOException { - super(descriptor, pageReader, skipTimestampConversion, type); + boolean skipTimestampConversion, Type type, TypeInfo hiveType) throws IOException { + super(descriptor, pageReader, skipTimestampConversion, type, hiveType); } @Override @@ -147,24 +147,22 @@ private Object readPrimitiveTypedRow(PrimitiveObjectInspector.PrimitiveCategory case INT: case BYTE: case SHORT: - return dataColumn.readInteger(); + case LONG: + return dataColumn.readLong(); case DATE: case INTERVAL_YEAR_MONTH: - case LONG: - return dataColumn.readLong(); case BOOLEAN: - return dataColumn.readBoolean() ? 1 : 0; - case DOUBLE: - return dataColumn.readDouble(); + return dataColumn.readBoolean(); case BINARY: case STRING: case CHAR: case VARCHAR: - return dataColumn.readBytes().getBytesUnsafe(); + return dataColumn.readBytes(); case FLOAT: - return dataColumn.readFloat(); + case DOUBLE: + return dataColumn.readDouble(); case DECIMAL: - return dataColumn.readBytes().getBytesUnsafe(); + return dataColumn.readBytes(); case INTERVAL_DAY_TIME: case TIMESTAMP: default: diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 08ac57b..651fcd5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -1,9 +1,13 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -11,9 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.io.parquet.vector; -import com.google.common.annotations.VisibleForTesting; +package org.apache.hadoop.hive.ql.io.parquet.vector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -474,9 +477,13 @@ private VectorizedColumnReader buildVectorizedParquetReader( if (columnDescriptors == null || columnDescriptors.isEmpty()) { throw new RuntimeException( "Failed to find related Parquet column descriptor with type " + type); - } else { + } + if (fileSchema.getColumns().contains(descriptors.get(0))) { return new VectorizedPrimitiveColumnReader(descriptors.get(0), - pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type, typeInfo); + } else { + // Support for schema evolution + return new VectorizedDummyColumnReader(); } case STRUCT: StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; @@ -503,7 +510,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( "Failed to find related Parquet column descriptor with type " + type); } return new VectorizedListColumnReader(descriptors.get(0), - pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type, typeInfo); case MAP: if (columnDescriptors == null || columnDescriptors.isEmpty()) { throw new RuntimeException( @@ -535,10 +542,10 @@ private VectorizedColumnReader buildVectorizedParquetReader( List kvTypes = groupType.getFields(); VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader( descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, - kvTypes.get(0)); + kvTypes.get(0), typeInfo); VectorizedListColumnReader valueListColumnReader = new VectorizedListColumnReader( descriptors.get(1), pages.getPageReader(descriptors.get(1)), skipTimestampConversion, - kvTypes.get(1)); + kvTypes.get(1), typeInfo); return new VectorizedMapColumnReader(keyListColumnReader, valueListColumnReader); case UNION: default: diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 39689f1..ed31344 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -41,8 +41,8 @@ public VectorizedPrimitiveColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, - Type type) throws IOException { - super(descriptor, pageReader, skipTimestampConversion, type); + Type type, TypeInfo hiveType) throws IOException { + super(descriptor, pageReader, skipTimestampConversion, type, hiveType); } @Override @@ -64,7 +64,7 @@ public void readBatch( LongColumnVector dictionaryIds = new LongColumnVector(); // Read and decode dictionary ids. readDictionaryIDs(num, dictionaryIds, rowId); - decodeDictionaryIds(rowId, num, column, dictionaryIds); + decodeDictionaryIds(rowId, num, column, columnType, dictionaryIds); } else { // assign values in vector readBatchHelper(num, column, columnType, rowId); @@ -148,7 +148,7 @@ private void readIntegers( while (left > 0) { readRepetitionAndDefinitionLevels(); if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readInteger(); + c.vector[rowId] = dataColumn.readLong(); c.isNull[rowId] = false; c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); } else { @@ -190,7 +190,7 @@ private void readBooleans( while (left > 0) { readRepetitionAndDefinitionLevels(); if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0; + c.vector[rowId] = dataColumn.readBoolean(); c.isNull[rowId] = false; c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); } else { @@ -232,7 +232,7 @@ private void readFloats( while (left > 0) { readRepetitionAndDefinitionLevels(); if (definitionLevel >= maxDefLevel) { - c.vector[rowId] = dataColumn.readFloat(); + c.vector[rowId] = dataColumn.readDouble(); c.isNull[rowId] = false; c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); } else { @@ -255,7 +255,7 @@ private void readDecimal( while (left > 0) { readRepetitionAndDefinitionLevels(); if (definitionLevel >= maxDefLevel) { - c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale); + c.vector[rowId].set(dataColumn.readBytes(), c.scale); c.isNull[rowId] = false; c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); } else { @@ -276,7 +276,7 @@ private void readBinaries( while (left > 0) { readRepetitionAndDefinitionLevels(); if (definitionLevel >= maxDefLevel) { - c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe()); + c.setVal(rowId, dataColumn.readBytes()); c.isNull[rowId] = false; // TODO figure out a better way to set repeat for Binary type c.isRepeating = false; @@ -298,9 +298,7 @@ private void readTimestamp(int total, TimestampColumnVector c, int rowId) throws switch (descriptor.getType()) { //INT64 is not yet supported case INT96: - NanoTime nt = NanoTime.fromBinary(dataColumn.readBytes()); - Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); - c.set(rowId, ts); + c.set(rowId, dataColumn.readTimestamp()); break; default: throw new IOException( @@ -322,11 +320,12 @@ private void readTimestamp(int total, TimestampColumnVector c, int rowId) throws /** * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. */ - private void decodeDictionaryIds( - int rowId, - int num, - ColumnVector column, - LongColumnVector dictionaryIds) { + private void decodeDictionaryIds2( + int rowId, + int num, + ColumnVector column, + TypeInfo columnType, + LongColumnVector dictionaryIds) { System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num); if (column.noNulls) { column.noNulls = dictionaryIds.noNulls; @@ -337,25 +336,25 @@ private void decodeDictionaryIds( case INT32: for (int i = rowId; i < rowId + num; ++i) { ((LongColumnVector) column).vector[i] = - dictionary.decodeToInt((int) dictionaryIds.vector[i]); + dictionary.decodeToInt((int) dictionaryIds.vector[i]); } break; case INT64: for (int i = rowId; i < rowId + num; ++i) { ((LongColumnVector) column).vector[i] = - dictionary.decodeToLong((int) dictionaryIds.vector[i]); + dictionary.decodeToLong((int) dictionaryIds.vector[i]); } break; case FLOAT: for (int i = rowId; i < rowId + num; ++i) { ((DoubleColumnVector) column).vector[i] = - dictionary.decodeToFloat((int) dictionaryIds.vector[i]); + dictionary.decodeToFloat((int) dictionaryIds.vector[i]); } break; case DOUBLE: for (int i = rowId; i < rowId + num; ++i) { ((DoubleColumnVector) column).vector[i] = - dictionary.decodeToDouble((int) dictionaryIds.vector[i]); + dictionary.decodeToDouble((int) dictionaryIds.vector[i]); } break; case INT96: @@ -374,17 +373,17 @@ private void decodeDictionaryIds( if (column instanceof BytesColumnVector) { for (int i = rowId; i < rowId + num; ++i) { ((BytesColumnVector) column) - .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); + .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); } } else { DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column); decimalColumnVector.precision = - (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); + (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); for (int i = rowId; i < rowId + num; ++i) { decimalColumnVector.vector[i] - .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(), - decimalColumnVector.scale); + .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(), + decimalColumnVector.scale); } } break; @@ -392,4 +391,94 @@ private void decodeDictionaryIds( throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); } } + + /** + * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. + */ + private void decodeDictionaryIds( + int rowId, + int num, + ColumnVector column, + TypeInfo columnType, + LongColumnVector dictionaryIds) { + System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num); + if (column.noNulls) { + column.noNulls = dictionaryIds.noNulls; + } + column.isRepeating = column.isRepeating && dictionaryIds.isRepeating; + + + PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + + switch (primitiveColumnType.getPrimitiveCategory()) { + case INT: + case BYTE: + case SHORT: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToInt((int) dictionaryIds.vector[i]); + } + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToLong((int) dictionaryIds.vector[i]); + } + break; + case BOOLEAN: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToBoolean((int) dictionaryIds.vector[i]) ? 1 : 0; + } + break; + case DOUBLE: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToDouble((int) dictionaryIds.vector[i]); + } + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + for (int i = rowId; i < rowId + num; ++i) { + ((BytesColumnVector) column) + .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); + } + break; + case FLOAT: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToFloat((int) dictionaryIds.vector[i]); + } + break; + case DECIMAL: + DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column); + decimalColumnVector.precision = + (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); + decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); + for (int i = rowId; i < rowId + num; ++i) { + decimalColumnVector.vector[i] + .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(), + decimalColumnVector.scale); + } + break; + case TIMESTAMP: + for (int i = rowId; i < rowId + num; ++i) { + ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer(); + buf.order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = buf.getLong(); + int julianDay = buf.getInt(); + NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); + Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + ((TimestampColumnVector) column).set(i, ts); + } + break; + case INTERVAL_DAY_TIME: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 9e414dc..1988d4b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -55,11 +55,16 @@ public static void cleanup() throws IOException { @Test public void testIntRead() throws Exception { intRead(isDictionaryEncoding); + longReadInt(isDictionaryEncoding); + floatReadInt(isDictionaryEncoding); + doubleReadInt(isDictionaryEncoding); } @Test public void testLongRead() throws Exception { longRead(isDictionaryEncoding); + floatReadLong(isDictionaryEncoding); + doubleReadLong(isDictionaryEncoding); } @Test @@ -70,6 +75,7 @@ public void testDoubleRead() throws Exception { @Test public void testFloatRead() throws Exception { floatRead(isDictionaryEncoding); + doubleReadFloat(isDictionaryEncoding); } @Test diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java index 3e5d831..8c5ad14 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -41,11 +41,16 @@ public static void cleanup() throws IOException { @Test public void testIntRead() throws Exception { intRead(isDictionaryEncoding); + longReadInt(isDictionaryEncoding); + floatReadInt(isDictionaryEncoding); + doubleReadInt(isDictionaryEncoding); } @Test public void testLongRead() throws Exception { longRead(isDictionaryEncoding); + floatReadLong(isDictionaryEncoding); + doubleReadLong(isDictionaryEncoding); } @Test @@ -56,6 +61,7 @@ public void testDoubleRead() throws Exception { @Test public void testFloatRead() throws Exception { floatRead(isDictionaryEncoding); + doubleReadFloat(isDictionaryEncoding); } @Test diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index 5d3ebd6..a81d6f9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -358,13 +357,87 @@ private static StructObjectInspector createStructObjectInspector(Configuration c return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); } - protected void intRead(boolean isDictionaryEncoding) throws InterruptedException, HiveException, IOException { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"int32_field"); - conf.set(IOConstants.COLUMNS_TYPES,"int"); + protected void floatReadInt(boolean isDictionaryEncoding) throws InterruptedException, + HiveException, IOException { + conf.set(IOConstants.COLUMNS, "int32_field"); + conf.set(IOConstants.COLUMNS_TYPES, "float"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = createTestParquetReader("message test { required int32" + + " int32_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals("Failed at " + c, getIntValue(isDictionaryEncoding, c), vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void doubleReadInt(boolean isDictionaryEncoding) throws InterruptedException, + HiveException, IOException { + conf.set(IOConstants.COLUMNS, "int32_field"); + conf.set(IOConstants.COLUMNS_TYPES, "double"); conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); VectorizedParquetRecordReader reader = + createTestParquetReader("message test { required int32 int32_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals("Failed at " + c, getIntValue(isDictionaryEncoding, c), vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void longReadInt(boolean isDictionaryEncoding) throws InterruptedException, + HiveException, IOException { + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "int32_field"); + c.set(IOConstants.COLUMNS_TYPES, "bigint"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + intRead(isDictionaryEncoding, c); + } + + protected void intRead(boolean isDictionaryEncoding) throws InterruptedException, + HiveException, IOException { + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "int32_field"); + c.set(IOConstants.COLUMNS_TYPES, "int"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + intRead(isDictionaryEncoding, c); + } + + private void intRead(boolean isDictionaryEncoding, Configuration conf) throws + InterruptedException, HiveException, IOException { + VectorizedParquetRecordReader reader = createTestParquetReader("message test { required int32 int32_field;}", conf); VectorizedRowBatch previous = reader.createValue(); try { @@ -387,12 +460,76 @@ protected void intRead(boolean isDictionaryEncoding) throws InterruptedException } } + protected void floatReadLong(boolean isDictionaryEncoding) throws Exception { + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "int64_field"); + c.set(IOConstants.COLUMNS_TYPES, "float"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message test { required int64 int64_field;}", c); + VectorizedRowBatch previous = reader.createValue(); + try { + int count = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (count == nElements) { + break; + } + assertEquals("Failed at " + count, getLongValue(isDictionaryEncoding, count), vector + .vector[i], 0); + assertFalse(vector.isNull[i]); + count++; + } + } + assertEquals(nElements, count); + } finally { + reader.close(); + } + } + + protected void doubleReadLong(boolean isDictionaryEncoding) throws Exception { + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "int64_field"); + c.set(IOConstants.COLUMNS_TYPES, "double"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message test { required int64 int64_field;}", c); + VectorizedRowBatch previous = reader.createValue(); + try { + int count = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (count == nElements) { + break; + } + assertEquals("Failed at " + count, getLongValue(isDictionaryEncoding, count), vector + .vector[i], 0); + assertFalse(vector.isNull[i]); + count++; + } + } + assertEquals(nElements, count); + } finally { + reader.close(); + } + } + protected void longRead(boolean isDictionaryEncoding) throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS, "int64_field"); - conf.set(IOConstants.COLUMNS_TYPES, "bigint"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "int64_field"); + c.set(IOConstants.COLUMNS_TYPES, "bigint"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + longRead(isDictionaryEncoding, c); + } + + private void longRead(boolean isDictionaryEncoding, Configuration conf) throws Exception { VectorizedParquetRecordReader reader = createTestParquetReader("message test { required int64 int64_field;}", conf); VectorizedRowBatch previous = reader.createValue(); @@ -417,11 +554,15 @@ protected void longRead(boolean isDictionaryEncoding) throws Exception { } protected void doubleRead(boolean isDictionaryEncoding) throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS, "double_field"); - conf.set(IOConstants.COLUMNS_TYPES, "double"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "double_field"); + c.set(IOConstants.COLUMNS_TYPES, "double"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + doubleRead(isDictionaryEncoding, c); + } + + private void doubleRead(boolean isDictionaryEncoding, Configuration conf) throws Exception { VectorizedParquetRecordReader reader = createTestParquetReader("message test { required double double_field;}", conf); VectorizedRowBatch previous = reader.createValue(); @@ -447,13 +588,26 @@ protected void doubleRead(boolean isDictionaryEncoding) throws Exception { } protected void floatRead(boolean isDictionaryEncoding) throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS, "float_field"); - conf.set(IOConstants.COLUMNS_TYPES, "float"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "float_field"); + c.set(IOConstants.COLUMNS_TYPES, "float"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + floatRead(isDictionaryEncoding, c); + } + + protected void doubleReadFloat(boolean isDictionaryEncoding) throws Exception { + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "float_field"); + c.set(IOConstants.COLUMNS_TYPES, "double"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + floatRead(isDictionaryEncoding, c); + } + + private void floatRead(boolean isDictionaryEncoding, Configuration conf) throws Exception { VectorizedParquetRecordReader reader = - createTestParquetReader("message test { required float float_field;}", conf); + createTestParquetReader("message test { required float float_field;}", conf); VectorizedRowBatch previous = reader.createValue(); try { int c = 0; @@ -465,7 +619,7 @@ protected void floatRead(boolean isDictionaryEncoding) throws Exception { break; } assertEquals("Failed at " + c, getFloatValue(isDictionaryEncoding, c), vector.vector[i], - 0); + 0); assertFalse(vector.isNull[i]); c++; } diff --git ql/src/test/queries/clientpositive/schema_evol_par_vec_table_dictionary_encoding.q ql/src/test/queries/clientpositive/schema_evol_par_vec_table_dictionary_encoding.q new file mode 100644 index 0000000..6b706ab --- /dev/null +++ ql/src/test/queries/clientpositive/schema_evol_par_vec_table_dictionary_encoding.q @@ -0,0 +1,94 @@ +set hive.fetch.task.conversion=none; +set hive.vectorized.execution.enabled=true; +set parquet.enable.dictionary=true; + +drop table test_alter; +drop table test_alter2; +drop table test_alter3; + +create table test_alter (id string) stored as parquet; +insert into test_alter values ('1'), ('2'), ('3'); +select * from test_alter; + +-- add new column -> empty col values should return NULL +alter table test_alter add columns (newCol string); +select * from test_alter; + +-- insert data into new column -> New data should be returned +insert into test_alter values ('4', '100'); +select * from test_alter; + +-- remove the newly added column +-- this works in vectorized execution +alter table test_alter replace columns (id string); +select * from test_alter; + +-- add column using replace column syntax +alter table test_alter replace columns (id string, id2 string); +-- this surprisingly doesn't return the 100 added to 4th row above +select * from test_alter; +insert into test_alter values ('5', '100'); +select * from test_alter; + +-- use the same column name and datatype +alter table test_alter replace columns (id string, id2 string); +select * from test_alter; + +-- change string to char +alter table test_alter replace columns (id char(10), id2 string); +select * from test_alter; + +-- change string to varchar +alter table test_alter replace columns (id string, id2 string); +alter table test_alter replace columns (id varchar(10), id2 string); +select * from test_alter; + +-- change columntype and column name +alter table test_alter replace columns (id string, id2 string); +alter table test_alter replace columns (idv varchar(10), id2 string); +select * from test_alter; + +-- test int to long type conversion +create table test_alter2 (id int) stored as parquet; +insert into test_alter2 values (1); +alter table test_alter2 replace columns (id bigint); +select * from test_alter2; + +-- test float to double type conversion +drop table test_alter2; +create table test_alter2 (id float) stored as parquet; +insert into test_alter2 values (1.5); +alter table test_alter2 replace columns (id double); +select * from test_alter2; + +drop table test_alter2; +create table test_alter2 (ts timestamp) stored as parquet; +insert into test_alter2 values ('2018-01-01 13:14:15.123456'), ('2018-01-02 14:15:16.123456'), ('2018-01-03 16:17:18.123456'); +select * from test_alter2; +alter table test_alter2 replace columns (ts string); +select * from test_alter2; + +drop table test_alter2; +create table test_alter2 (ts timestamp) stored as parquet; +insert into test_alter2 values ('2018-01-01 13:14:15.123456'), ('2018-01-02 14:15:16.123456'), ('2018-01-03 16:17:18.123456'); +select * from test_alter2; +alter table test_alter2 replace columns (ts varchar(19)); +-- this should truncate the microseconds +select * from test_alter2; + +drop table test_alter2; +create table test_alter2 (ts timestamp) stored as parquet; +insert into test_alter2 values ('2018-01-01 13:14:15.123456'), ('2018-01-02 14:15:16.123456'), ('2018-01-03 16:17:18.123456'); +select * from test_alter2; +alter table test_alter2 replace columns (ts char(25)); +select * from test_alter2; + +-- test integer types upconversion +create table test_alter3 (id1 tinyint, id2 smallint, id3 int, id4 bigint) stored as parquet; +insert into test_alter3 values (10, 20, 30, 40); +alter table test_alter3 replace columns (id1 smallint, id2 int, id3 bigint, id4 decimal(10,4)); +-- this fails mostly due to bigint to decimal +-- select * from test_alter3; +select id1, id2, id3 from test_alter3; + + diff --git ql/src/test/queries/clientpositive/schema_evol_par_vec_table_non_dictionary_encoding.q ql/src/test/queries/clientpositive/schema_evol_par_vec_table_non_dictionary_encoding.q new file mode 100644 index 0000000..3006bd4 --- /dev/null +++ ql/src/test/queries/clientpositive/schema_evol_par_vec_table_non_dictionary_encoding.q @@ -0,0 +1,94 @@ +set hive.fetch.task.conversion=none; +set hive.vectorized.execution.enabled=true; +set parquet.enable.dictionary=false; + +drop table test_alter; +drop table test_alter2; +drop table test_alter3; + +create table test_alter (id string) stored as parquet; +insert into test_alter values ('1'), ('2'), ('3'); +select * from test_alter; + +-- add new column -> empty col values should return NULL +alter table test_alter add columns (newCol string); +select * from test_alter; + +-- insert data into new column -> New data should be returned +insert into test_alter values ('4', '100'); +select * from test_alter; + +-- remove the newly added column +-- this works in vectorized execution +alter table test_alter replace columns (id string); +select * from test_alter; + +-- add column using replace column syntax +alter table test_alter replace columns (id string, id2 string); +-- this surprisingly doesn't return the 100 added to 4th row above +select * from test_alter; +insert into test_alter values ('5', '100'); +select * from test_alter; + +-- use the same column name and datatype +alter table test_alter replace columns (id string, id2 string); +select * from test_alter; + +-- change string to char +alter table test_alter replace columns (id char(10), id2 string); +select * from test_alter; + +-- change string to varchar +alter table test_alter replace columns (id string, id2 string); +alter table test_alter replace columns (id varchar(10), id2 string); +select * from test_alter; + +-- change columntype and column name +alter table test_alter replace columns (id string, id2 string); +alter table test_alter replace columns (idv varchar(10), id2 string); +select * from test_alter; + +-- test int to long type conversion +create table test_alter2 (id int) stored as parquet; +insert into test_alter2 values (1); +alter table test_alter2 replace columns (id bigint); +select * from test_alter2; + +-- test float to double type conversion +drop table test_alter2; +create table test_alter2 (id float) stored as parquet; +insert into test_alter2 values (1.5); +alter table test_alter2 replace columns (id double); +select * from test_alter2; + +drop table test_alter2; +create table test_alter2 (ts timestamp) stored as parquet; +insert into test_alter2 values ('2018-01-01 13:14:15.123456'), ('2018-01-02 14:15:16.123456'), ('2018-01-03 16:17:18.123456'); +select * from test_alter2; +alter table test_alter2 replace columns (ts string); +select * from test_alter2; + +drop table test_alter2; +create table test_alter2 (ts timestamp) stored as parquet; +insert into test_alter2 values ('2018-01-01 13:14:15.123456'), ('2018-01-02 14:15:16.123456'), ('2018-01-03 16:17:18.123456'); +select * from test_alter2; +alter table test_alter2 replace columns (ts varchar(19)); +-- this should truncate the microseconds +select * from test_alter2; + +drop table test_alter2; +create table test_alter2 (ts timestamp) stored as parquet; +insert into test_alter2 values ('2018-01-01 13:14:15.123456'), ('2018-01-02 14:15:16.123456'), ('2018-01-03 16:17:18.123456'); +select * from test_alter2; +alter table test_alter2 replace columns (ts char(25)); +select * from test_alter2; + +-- test integer types upconversion +create table test_alter3 (id1 tinyint, id2 smallint, id3 int, id4 bigint) stored as parquet; +insert into test_alter3 values (10, 20, 30, 40); +alter table test_alter3 replace columns (id1 smallint, id2 int, id3 bigint, id4 decimal(10,4)); +-- this fails mostly due to bigint to decimal +-- select * from test_alter3; +select id1, id2, id3 from test_alter3; + + diff --git ql/src/test/results/clientpositive/schema_evol_par_vec_table.q.out ql/src/test/results/clientpositive/schema_evol_par_vec_table.q.out new file mode 100644 index 0000000..a6128b6 --- /dev/null +++ ql/src/test/results/clientpositive/schema_evol_par_vec_table.q.out @@ -0,0 +1,357 @@ +PREHOOK: query: drop table test_alter +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table test_alter +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table test_alter2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table test_alter2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table test_alter3 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table test_alter3 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table test_alter (id string) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_alter +POSTHOOK: query: create table test_alter (id string) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_alter +PREHOOK: query: insert into test_alter values ('1'), ('2'), ('3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_alter +POSTHOOK: query: insert into test_alter values ('1'), ('2'), ('3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_alter +POSTHOOK: Lineage: test_alter.id SCRIPT [] +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +1 +2 +3 +PREHOOK: query: alter table test_alter add columns (newCol string) +PREHOOK: type: ALTERTABLE_ADDCOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter add columns (newCol string) +POSTHOOK: type: ALTERTABLE_ADDCOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +1 NULL +2 NULL +3 NULL +PREHOOK: query: insert into test_alter values ('4', '100') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_alter +POSTHOOK: query: insert into test_alter values ('4', '100') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_alter +POSTHOOK: Lineage: test_alter.id SCRIPT [] +POSTHOOK: Lineage: test_alter.newcol SCRIPT [] +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +1 NULL +2 NULL +3 NULL +4 100 +PREHOOK: query: alter table test_alter replace columns (id string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (id string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +1 +2 +3 +4 +PREHOOK: query: alter table test_alter replace columns (id string, id2 string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (id string, id2 string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +1 NULL +2 NULL +3 NULL +4 NULL +PREHOOK: query: insert into test_alter values ('5', '100') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_alter +POSTHOOK: query: insert into test_alter values ('5', '100') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_alter +POSTHOOK: Lineage: test_alter.id SCRIPT [] +POSTHOOK: Lineage: test_alter.id2 SCRIPT [] +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +5 100 +1 NULL +2 NULL +3 NULL +4 NULL +PREHOOK: query: alter table test_alter replace columns (id string, id2 string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (id string, id2 string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +5 100 +1 NULL +2 NULL +3 NULL +4 NULL +PREHOOK: query: alter table test_alter replace columns (id char(10), id2 string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (id char(10), id2 string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +5 100 +1 NULL +2 NULL +3 NULL +4 NULL +PREHOOK: query: alter table test_alter replace columns (id string, id2 string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (id string, id2 string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: alter table test_alter replace columns (id varchar(10), id2 string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (id varchar(10), id2 string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +5 100 +1 NULL +2 NULL +3 NULL +4 NULL +PREHOOK: query: alter table test_alter replace columns (id string, id2 string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (id string, id2 string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: alter table test_alter replace columns (idv varchar(10), id2 string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter +PREHOOK: Output: default@test_alter +POSTHOOK: query: alter table test_alter replace columns (idv varchar(10), id2 string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter +POSTHOOK: Output: default@test_alter +PREHOOK: query: select * from test_alter +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter +#### A masked pattern was here #### +NULL 100 +NULL NULL +NULL NULL +NULL NULL +NULL NULL +PREHOOK: query: create table test_alter2 (id int) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_alter2 +POSTHOOK: query: create table test_alter2 (id int) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_alter2 +PREHOOK: query: insert into test_alter2 values (1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_alter2 +POSTHOOK: query: insert into test_alter2 values (1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_alter2 +POSTHOOK: Lineage: test_alter2.id SCRIPT [] +PREHOOK: query: alter table test_alter2 replace columns (id bigint) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter2 +PREHOOK: Output: default@test_alter2 +POSTHOOK: query: alter table test_alter2 replace columns (id bigint) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter2 +POSTHOOK: Output: default@test_alter2 +PREHOOK: query: select * from test_alter2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter2 +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter2 +#### A masked pattern was here #### +1 +PREHOOK: query: drop table test_alter2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_alter2 +PREHOOK: Output: default@test_alter2 +POSTHOOK: query: drop table test_alter2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_alter2 +POSTHOOK: Output: default@test_alter2 +PREHOOK: query: create table test_alter2 (id float) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_alter2 +POSTHOOK: query: create table test_alter2 (id float) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_alter2 +PREHOOK: query: insert into test_alter2 values (1.5) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_alter2 +POSTHOOK: query: insert into test_alter2 values (1.5) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_alter2 +POSTHOOK: Lineage: test_alter2.id SCRIPT [] +PREHOOK: query: alter table test_alter2 replace columns (id double) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter2 +PREHOOK: Output: default@test_alter2 +POSTHOOK: query: alter table test_alter2 replace columns (id double) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter2 +POSTHOOK: Output: default@test_alter2 +PREHOOK: query: select * from test_alter2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter2 +#### A masked pattern was here #### +POSTHOOK: query: select * from test_alter2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter2 +#### A masked pattern was here #### +1.5 +PREHOOK: query: create table test_alter3 (id1 tinyint, id2 smallint, id3 int, id4 bigint) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_alter3 +POSTHOOK: query: create table test_alter3 (id1 tinyint, id2 smallint, id3 int, id4 bigint) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_alter3 +PREHOOK: query: insert into test_alter3 values (10, 20, 30, 40) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_alter3 +POSTHOOK: query: insert into test_alter3 values (10, 20, 30, 40) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_alter3 +POSTHOOK: Lineage: test_alter3.id1 SCRIPT [] +POSTHOOK: Lineage: test_alter3.id2 SCRIPT [] +POSTHOOK: Lineage: test_alter3.id3 SCRIPT [] +POSTHOOK: Lineage: test_alter3.id4 SCRIPT [] +PREHOOK: query: alter table test_alter3 replace columns (id1 smallint, id2 int, id3 bigint, id4 decimal(10,4)) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@test_alter3 +PREHOOK: Output: default@test_alter3 +POSTHOOK: query: alter table test_alter3 replace columns (id1 smallint, id2 int, id3 bigint, id4 decimal(10,4)) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@test_alter3 +POSTHOOK: Output: default@test_alter3 +PREHOOK: query: select id1, id2, id3 from test_alter3 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_alter3 +#### A masked pattern was here #### +POSTHOOK: query: select id1, id2, id3 from test_alter3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_alter3 +#### A masked pattern was here #### +10 20 30