diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/AbstractParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/AbstractParquetRecordReader.java new file mode 100644 index 0000000..b288a21 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/AbstractParquetRecordReader.java @@ -0,0 +1,169 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import com.google.common.base.Strings; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class AbstractParquetRecordReader { + public static final Logger LOG = LoggerFactory.getLogger(AbstractParquetRecordReader.class); + + protected Path file; + protected ProjectionPusher projectionPusher; + protected boolean skipTimestampConversion = false; + protected SerDeStats serDeStats; + protected JobConf jobConf; + + protected int schemaSize; + protected List filtedBlocks; + protected ParquetFileReader reader; + + /** + * gets a ParquetInputSplit corresponding to a split given by Hive + * + * @param oldSplit The split given by Hive + * @param conf The JobConf of the Hive job + * @return a ParquetInputSplit corresponding to the oldSplit + * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file + */ + @SuppressWarnings("deprecation") + protected ParquetInputSplit getSplit( + final org.apache.hadoop.mapred.InputSplit oldSplit, + final JobConf conf + ) throws IOException { + ParquetInputSplit split; + if (oldSplit instanceof FileSplit) { + final Path finalPath = ((FileSplit) oldSplit).getPath(); + jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); + final List blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + final ReadSupport.ReadContext + readContext = new DataWritableReadSupport().init(new InitContext(jobConf, + null, fileMetaData.getSchema())); + + // Compute stats + for (BlockMetaData bmd : blocks) { + serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); + serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); + } + + schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() + .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); + final List splitGroup = new ArrayList(); + final long splitStart = ((FileSplit) oldSplit).getStart(); + final long splitLength = ((FileSplit) oldSplit).getLength(); + for (final BlockMetaData block : blocks) { + final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { + splitGroup.add(block); + } + } + if (splitGroup.isEmpty()) { + LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); + return null; + } + + FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); + if (filter != null) { + filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filtedBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); + return null; + } + + long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); + } + } else { + filtedBlocks = splitGroup; + } + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + } + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + ((FileSplit) oldSplit).getLocations(), + filtedBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; + } else { + throw new IllegalArgumentException("Unknown split type: " + oldSplit); + } + } + + public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); + if (sarg == null) { + return null; + } + + // Create the Parquet FilterPredicate without including columns that do not exist + // on the shema (such as partition columns). + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema); + if (p != null) { + // Filter may have sensitive information. Do not send to debug. + LOG.debug("PARQUET predicate push down generated."); + ParquetInputFormat.setFilterPredicate(conf, p); + return FilterCompat.get(p); + } else { + // Filter may have sensitive information. Do not send to debug. + LOG.debug("No PARQUET predicate push down is generated."); + return null; + } + } + + public List getFiltedBlocks() { + return filtedBlocks; + } + + public SerDeStats getStats() { + return serDeStats; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index 5b65e5c..f4fadbb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -14,6 +14,8 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -34,7 +36,8 @@ * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types * are not currently supported. Removing the interface turns off vectorization. */ -public class MapredParquetInputFormat extends FileInputFormat { +public class MapredParquetInputFormat extends FileInputFormat + implements VectorizedInputFormatInterface { private static final Logger LOG = LoggerFactory.getLogger(MapredParquetInputFormat.class); @@ -48,7 +51,7 @@ public MapredParquetInputFormat() { protected MapredParquetInputFormat(final ParquetInputFormat inputFormat) { this.realInput = inputFormat; - vectorizedSelf = new VectorizedParquetInputFormat(inputFormat); + vectorizedSelf = new VectorizedParquetInputFormat(); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -69,8 +72,7 @@ protected MapredParquetInputFormat(final ParquetInputFormat input if (LOG.isDebugEnabled()) { LOG.debug("Using row-mode record reader"); } - return (RecordReader) - new ParquetRecordReaderWrapper(realInput, split, job, reporter); + return new ParquetRecordReaderWrapper(realInput, split, job, reporter); } } catch (final InterruptedException e) { throw new RuntimeException("Cannot create a RecordReaderWrapper", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index 2072533..322178a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -15,147 +15,29 @@ import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.parquet.hadoop.ParquetInputFormat; /** * Vectorized input format for Parquet files */ -public class VectorizedParquetInputFormat extends FileInputFormat - implements VectorizedInputFormatInterface { - - private static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetInputFormat.class); - - /** - * Vectorized record reader for vectorized Parquet input format - */ - private static class VectorizedParquetRecordReader implements - RecordReader { - private static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class); - - private final ParquetRecordReaderWrapper internalReader; - private VectorizedRowBatchCtx rbCtx; - private Object[] partitionValues; - private ArrayWritable internalValues; - private NullWritable internalKey; - private VectorColumnAssign[] assigners; - - public VectorizedParquetRecordReader( - ParquetInputFormat realInput, - FileSplit split, - JobConf conf, Reporter reporter) throws IOException, InterruptedException { - internalReader = new ParquetRecordReaderWrapper( - realInput, - split, - conf, - reporter); - rbCtx = Utilities.getVectorizedRowBatchCtx(conf); - int partitionColumnCount = rbCtx.getPartitionColumnCount(); - if (partitionColumnCount > 0) { - partitionValues = new Object[partitionColumnCount]; - rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues); - } - } - - @Override - public NullWritable createKey() { - internalKey = internalReader.createKey(); - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - VectorizedRowBatch outputBatch; - outputBatch = rbCtx.createVectorizedRowBatch(); - internalValues = internalReader.createValue(); - return outputBatch; - } - - @Override - public long getPos() throws IOException { - return internalReader.getPos(); - } +public class VectorizedParquetInputFormat + extends FileInputFormat { - @Override - public void close() throws IOException { - internalReader.close(); - } - - @Override - public float getProgress() throws IOException { - return internalReader.getProgress(); - } - - @Override - public boolean next(NullWritable key, VectorizedRowBatch outputBatch) - throws IOException { - if (assigners != null) { - assert(outputBatch.numCols == assigners.length); - } - outputBatch.reset(); - int maxSize = outputBatch.getMaxSize(); - try { - while (outputBatch.size < maxSize) { - if (false == internalReader.next(internalKey, internalValues)) { - outputBatch.endOfFile = true; - break; - } - Writable[] writables = internalValues.get(); - - if (null == assigners) { - // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet - // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and - // the writable value (IntWritable). see Parquet's ETypeConverter class. - assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, writables); - } - - for(int i = 0; i < writables.length; ++i) { - assigners[i].assignObjectValue(writables[i], outputBatch.size); - } - ++outputBatch.size; - } - } catch (HiveException e) { - throw new RuntimeException(e); - } - return outputBatch.size > 0; - } + public VectorizedParquetInputFormat() { } - private final ParquetInputFormat realInput; - - public VectorizedParquetInputFormat(ParquetInputFormat realInput) { - this.realInput = realInput; - } - - @SuppressWarnings("unchecked") @Override public RecordReader getRecordReader( - InputSplit split, JobConf conf, Reporter reporter) throws IOException { - try { - return (RecordReader) - new VectorizedParquetRecordReader(realInput, (FileSplit) split, conf, reporter); - } catch (final InterruptedException e) { - throw new RuntimeException("Cannot create a VectorizedParquetRecordReader", e); - } + InputSplit inputSplit, + JobConf jobConf, + Reporter reporter) throws IOException { + return new VectorizedParquetRecordReader(inputSplit, jobConf); } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 3e38cc7..6b81914 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -65,7 +65,7 @@ * @param columns comma separated list of columns * @return list with virtual columns removed */ - private static List getColumnNames(final String columns) { + public static List getColumnNames(final String columns) { return (List) VirtualColumn. removeVirtualColumns(StringUtils.getStringCollection(columns)); } @@ -77,7 +77,7 @@ * @param types Comma separated list of types * @return A list of TypeInfo objects. */ - private static List getColumnTypes(final String types) { + public static List getColumnTypes(final String types) { return TypeInfoUtils.getTypeInfosFromTypeString(types); } @@ -135,7 +135,7 @@ private static Type getProjectedType(TypeInfo colType, Type fieldType) { ((StructTypeInfo) colType).getAllStructFieldNames(), ((StructTypeInfo) colType).getAllStructFieldTypeInfos() ); - + Type[] typesArray = groupFields.toArray(new Type[0]); return Types.buildGroup(fieldType.getRepetition()) .addFields(typesArray) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index d2e1b13..d1cfb0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -14,24 +14,20 @@ package org.apache.hadoop.hive.ql.io.parquet.read; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.ql.io.parquet.AbstractParquetRecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; -import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -39,25 +35,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.filter2.compat.RowGroupFilter; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetInputSplit; -import org.apache.parquet.hadoop.api.InitContext; -import org.apache.parquet.hadoop.api.ReadSupport.ReadContext; import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.FileMetaData; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ContextUtil; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; -import com.google.common.base.Strings; - -public class ParquetRecordReaderWrapper implements RecordReader, - StatsProvidingRecordReader { +public class ParquetRecordReaderWrapper extends AbstractParquetRecordReader + implements RecordReader, StatsProvidingRecordReader { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderWrapper.class); private final long splitLen; // for getPos() @@ -68,12 +52,6 @@ private ArrayWritable valueObj = null; private boolean firstRecord = false; private boolean eof = false; - private int schemaSize; - private boolean skipTimestampConversion = false; - private JobConf jobConf; - private final ProjectionPusher projectionPusher; - private List filtedBlocks; - private final SerDeStats serDeStats; public ParquetRecordReaderWrapper( final ParquetInputFormat newInputFormat, @@ -137,27 +115,6 @@ public ParquetRecordReaderWrapper( } } - public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { - SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); - if (sarg == null) { - return null; - } - - // Create the Parquet FilterPredicate without including columns that do not exist - // on the shema (such as partition columns). - FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema); - if (p != null) { - // Filter may have sensitive information. Do not send to debug. - LOG.debug("PARQUET predicate push down generated."); - ParquetInputFormat.setFilterPredicate(conf, p); - return FilterCompat.get(p); - } else { - // Filter may have sensitive information. Do not send to debug. - LOG.debug("No PARQUET predicate push down is generated."); - return null; - } - } - @Override public void close() throws IOException { if (realReader != null) { @@ -227,94 +184,4 @@ public boolean next(final NullWritable key, final ArrayWritable value) throws IO throw new IOException(e); } } - - /** - * gets a ParquetInputSplit corresponding to a split given by Hive - * - * @param oldSplit The split given by Hive - * @param conf The JobConf of the Hive job - * @return a ParquetInputSplit corresponding to the oldSplit - * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file - */ - @SuppressWarnings("deprecation") - protected ParquetInputSplit getSplit( - final InputSplit oldSplit, - final JobConf conf - ) throws IOException { - ParquetInputSplit split; - if (oldSplit instanceof FileSplit) { - final Path finalPath = ((FileSplit) oldSplit).getPath(); - jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); - - final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); - final List blocks = parquetMetadata.getBlocks(); - final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - - final ReadContext readContext = new DataWritableReadSupport().init(new InitContext(jobConf, - null, fileMetaData.getSchema())); - - // Compute stats - for (BlockMetaData bmd : blocks) { - serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); - serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); - } - - schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() - .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); - final List splitGroup = new ArrayList(); - final long splitStart = ((FileSplit) oldSplit).getStart(); - final long splitLength = ((FileSplit) oldSplit).getLength(); - for (final BlockMetaData block : blocks) { - final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { - splitGroup.add(block); - } - } - if (splitGroup.isEmpty()) { - LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); - return null; - } - - FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); - if (filter != null) { - filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); - if (filtedBlocks.isEmpty()) { - LOG.debug("All row groups are dropped due to filter predicates"); - return null; - } - - long droppedBlocks = splitGroup.size() - filtedBlocks.size(); - if (droppedBlocks > 0) { - LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); - } - } else { - filtedBlocks = splitGroup; - } - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); - } - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - ((FileSplit) oldSplit).getLocations(), - filtedBlocks, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); - return split; - } else { - throw new IllegalArgumentException("Unknown split type: " + oldSplit); - } - } - - public List getFiltedBlocks() { - return filtedBlocks; - } - - @Override - public SerDeStats getStats() { - return serDeStats; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java new file mode 100644 index 0000000..69d1ae9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java @@ -0,0 +1,712 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import jodd.datetime.JDateTime; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.Type; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Calendar; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; + +public class VectorizedColumnReader { + static final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1); + static final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1); + static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1); + private boolean skipTimestampConversion = false; + private static final ThreadLocal parquetGMTCalendar = new ThreadLocal<>(); + private static final ThreadLocal parquetLocalCalendar = new ThreadLocal<>(); + + /** + * Total number of values read. + */ + private long valuesRead; + + /** + * value that indicates the end of the current page. That is, + * if valuesRead == endOfPageValueCount, we are at the end of the page. + */ + private long endOfPageValueCount; + + /** + * The dictionary, if this column has dictionary encoding. + */ + private final Dictionary dictionary; + + /** + * If true, the current page is dictionary encoded. + */ + private boolean isCurrentPageDictionaryEncoded; + + /** + * Maximum definition level for this column. + */ + private final int maxDefLevel; + + private int definitionLevel; + private int repetitionLevel; + + /** + * Repetition/Definition/Value readers. + */ + private IntIterator repetitionLevelColumn; + private IntIterator definitionLevelColumn; + private ValuesReader dataColumn; + + // Only set if vectorized decoding is true. This is used instead of the row by row decoding + // with `definitionLevelColumn`. + private VectorizedRleValuesReader defColumn; + + /** + * Total number of values in this column (in this row group). + */ + private final long totalValueCount; + + /** + * Total values in the current page. + */ + private int pageValueCount; + + private final PageReader pageReader; + private final ColumnDescriptor descriptor; + private final Type type; + + public VectorizedColumnReader( + ColumnDescriptor descriptor, + PageReader pageReader, + boolean skipTimestampConversion, + Type type) throws IOException { + this.descriptor = descriptor; + this.type = type; + this.pageReader = pageReader; + this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + this.skipTimestampConversion = skipTimestampConversion; + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + try { + this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); + this.isCurrentPageDictionaryEncoded = true; + } catch (IOException e) { + throw new IOException("could not decode the dictionary for " + descriptor, e); + } + } else { + this.dictionary = null; + this.isCurrentPageDictionaryEncoded = false; + } + this.totalValueCount = pageReader.getTotalValueCount(); + if (totalValueCount == 0) { + throw new IOException("totalValueCount == 0"); + } + } + + void readBatch( + int total, + ColumnVector column, + TypeInfo columnType) throws IOException { + + int rowId = 0; + while (total > 0) { + // Compute the number of values we want to read in this page. + consume(); + int leftInPage = (int) (endOfPageValueCount - valuesRead); + int num = Math.min(total, leftInPage); + if (isCurrentPageDictionaryEncoded) { + LongColumnVector dictionaryIds = new LongColumnVector(); + // Read and decode dictionary ids. + readIntegers(num, dictionaryIds, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + decodeDictionaryIds(rowId, num, column, dictionaryIds); + } else { + // assign values in vector + PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + switch (primitiveColumnType.getPrimitiveCategory()) { + case INT: + case BYTE: + case SHORT: + readIntegers(num, (LongColumnVector) column, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + readLongs(num, (LongColumnVector) column, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + break; + case BOOLEAN: + readBooleans(num, (LongColumnVector) column, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + break; + case DOUBLE: + readDoubles(num, (DoubleColumnVector) column, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + readBinarys(num, (BytesColumnVector) column, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + break; + case FLOAT: + readFloats(num, (DoubleColumnVector) column, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + break; + case DECIMAL: + readDecimal(num, (DecimalColumnVector) column, rowId, maxDefLevel, + (VectorizedValuesReader) dataColumn); + case INTERVAL_DAY_TIME: + case TIMESTAMP: + break; + default: + throw new IOException("Unsupported"); + } + } + rowId += num; + total -= num; + } + } + + private void consume() throws IOException { + int leftInPage = (int) (endOfPageValueCount - valuesRead); + if (leftInPage == 0) { + readPage(); + } + } + + public void readIntegers( + int total, + LongColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= level) { + c.vector[rowId] = data.readInteger(); + c.isNull[rowId] = false; + if(c.isRepeating){ + c.isRepeating = (c.vector[0] == c.vector[rowId]); + } + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + public void readDoubles( + int total, + DoubleColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= level) { + c.vector[rowId] = data.readDouble(); + c.isNull[rowId] = false; + if(c.isRepeating){ + c.isRepeating = (c.vector[0] == c.vector[rowId]); + } + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + public void readBooleans( + int total, + LongColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= level) { + c.vector[rowId] = data.readBoolean() ? 1 : 0; + c.isNull[rowId] = false; + if(c.isRepeating){ + c.isRepeating = (c.vector[0] == c.vector[rowId]); + } + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + public void readLongs( + int total, + LongColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= level) { + c.vector[rowId] = data.readLong(); + c.isNull[rowId] = false; + if(c.isRepeating){ + c.isRepeating = (c.vector[0] == c.vector[rowId]); + } + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + public void readFloats( + int total, + DoubleColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= level) { + c.vector[rowId] = data.readFloat(); + c.isNull[rowId] = false; + if(c.isRepeating){ + c.isRepeating = (c.vector[0] == c.vector[rowId]); + } + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + public void readDecimal( + int total, + DecimalColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); + c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= level) { + c.vector[rowId] = new HiveDecimalWritable(data.readBytes().getBytesUnsafe(), + type.asPrimitiveType().getDecimalMetadata().getScale()); + c.isNull[rowId] = false; + if(c.isRepeating){ + c.isRepeating = (c.vector[0] == c.vector[rowId]); + } + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + public void readBinarys( + int total, + BytesColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= level) { + Binary binary = data.readBytes(); + c.setVal(rowId, binary.getBytesUnsafe()); + c.isNull[rowId] = false; + if(c.isRepeating){ + c.isRepeating = Arrays.equals(binary.getBytesUnsafe(), + ArrayUtils.subarray(c.vector[0], c.start[0], c.length[0])); + } + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + /** + * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. + */ + private void decodeDictionaryIds(int rowId, int num, ColumnVector column, + LongColumnVector dictionaryIds) { + System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num); + if (column.noNulls) { + column.noNulls = dictionaryIds.noNulls ? true : false; + } + if(column.isRepeating){ + column.isRepeating = dictionaryIds.isRepeating; + } + switch (descriptor.getType()) { + case INT32: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToInt((int) dictionaryIds.vector[i]); + } + break; + case INT64: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToLong((int) dictionaryIds.vector[i]); + } + break; + case FLOAT: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToFloat((int) dictionaryIds.vector[i]); + } + break; + case DOUBLE: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToDouble((int) dictionaryIds.vector[i]); + } + break; + case INT96: + for (int i = rowId; i < rowId + num; ++i) { + ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer(); + buf.order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = buf.getLong(); + int julianDay = buf.getInt(); + NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); + Timestamp ts = getTimestamp(nt, skipTimestampConversion); + ((TimestampColumnVector) column).set(i, ts); + } + break; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + for (int i = rowId; i < rowId + num; ++i) { + ((BytesColumnVector) column) + .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); + } + break; + default: + throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); + } + } + + public static Timestamp getTimestamp( + NanoTime nt, + boolean skipConversion) { + int julianDay = nt.getJulianDay(); + long nanosOfDay = nt.getTimeOfDayNanos(); + + long remainder = nanosOfDay; + julianDay += remainder / NANOS_PER_DAY; + remainder %= NANOS_PER_DAY; + if (remainder < 0) { + remainder += NANOS_PER_DAY; + julianDay--; + } + + JDateTime jDateTime = new JDateTime((double) julianDay); + Calendar calendar = getCalendar(skipConversion); + calendar.set(Calendar.YEAR, jDateTime.getYear()); + calendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); //java calendar index starting at 1. + calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay()); + + int hour = (int) (remainder / (NANOS_PER_HOUR)); + remainder = remainder % (NANOS_PER_HOUR); + int minutes = (int) (remainder / (NANOS_PER_MINUTE)); + remainder = remainder % (NANOS_PER_MINUTE); + int seconds = (int) (remainder / (NANOS_PER_SECOND)); + long nanos = remainder % NANOS_PER_SECOND; + + calendar.set(Calendar.HOUR_OF_DAY, hour); + calendar.set(Calendar.MINUTE, minutes); + calendar.set(Calendar.SECOND, seconds); + Timestamp ts = new Timestamp(calendar.getTimeInMillis()); + ts.setNanos((int) nanos); + return ts; + } + + private static Calendar getCalendar(boolean skipConversion) { + Calendar calendar = skipConversion ? getLocalCalendar() : getGMTCalendar(); + calendar.clear(); // Reset all fields before reusing this instance + return calendar; + } + + private static Calendar getGMTCalendar() { + //Calendar.getInstance calculates the current-time needlessly, so cache an instance. + if (parquetGMTCalendar.get() == null) { + parquetGMTCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))); + } + return parquetGMTCalendar.get(); + } + + private static Calendar getLocalCalendar() { + if (parquetLocalCalendar.get() == null) { + parquetLocalCalendar.set(Calendar.getInstance()); + } + return parquetLocalCalendar.get(); + } + + private void readPage() throws IOException { + DataPage page = pageReader.readPage(); + // TODO: Why is this a visitor? + page.accept(new DataPage.Visitor() { + @Override + public Void visit(DataPageV1 dataPageV1) { + try { + readPageV1(dataPageV1); + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Void visit(DataPageV2 dataPageV2) { + try { + readPageV2(dataPageV2); + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + + private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException { + this.endOfPageValueCount = valuesRead + pageValueCount; + if (dataEncoding.usesDictionary()) { + this.dataColumn = null; + if (dictionary == null) { + throw new IOException( + "could not read page in col " + descriptor + + " as the dictionary was missing for encoding " + dataEncoding); + } + @SuppressWarnings("deprecation") + Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression + if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) { + throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); + } + this.dataColumn = new VectorizedRleValuesReader(); + this.isCurrentPageDictionaryEncoded = true; + } else { + if (dataEncoding != Encoding.PLAIN) { + throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); + } + this.dataColumn = new VectorizedPlainValuesReader(); + this.isCurrentPageDictionaryEncoded = false; + } + + try { + dataColumn.initFromPage(pageValueCount, bytes, offset); + } catch (IOException e) { + throw new IOException("could not read page in col " + descriptor, e); + } + } + + private void readPageV1(DataPageV1 page) throws IOException { + this.pageValueCount = page.getValueCount(); + ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); + ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); + + this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); + this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); + // Initialize the decoders. + if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) { + throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding()); + } + int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); + this.defColumn = + new VectorizedRleValuesReader(bitWidth, definitionLevelColumn, repetitionLevelColumn); + try { + byte[] bytes = page.getBytes().toByteArray(); + rlReader.initFromPage(pageValueCount, bytes, 0); + int next = rlReader.getNextOffset(); + dlReader.initFromPage(pageValueCount, bytes, next); + next = dlReader.getNextOffset(); + initDataReader(page.getValueEncoding(), bytes, next); + } catch (IOException e) { + throw new IOException("could not read page " + page + " in col " + descriptor, e); + } + } + + private void readPageV2(DataPageV2 page) throws IOException { + this.pageValueCount = page.getValueCount(); + this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(), + page.getRepetitionLevels(), descriptor); + this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page + .getDefinitionLevels()); + + int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); + this.defColumn = + new VectorizedRleValuesReader(bitWidth, definitionLevelColumn, repetitionLevelColumn); + this.definitionLevelColumn = new ValuesReaderIntIterator(this + .defColumn); + this.defColumn.initFromBuffer(this.pageValueCount, page.getDefinitionLevels().toByteArray()); + try { + initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0); + } catch (IOException e) { + throw new IOException("could not read page " + page + " in col " + descriptor, e); + } + } + + private void readRepetitionAndDefinitionLevels() { + repetitionLevel = repetitionLevelColumn.nextInt(); + definitionLevel = definitionLevelColumn.nextInt(); + valuesRead++; + } + + private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { + try { + if (maxLevel == 0) { + return new NullIntIterator(); + } + return new RLEIntIterator( + new RunLengthBitPackingHybridDecoder(BytesUtils.getWidthFromMaxInt(maxLevel), + new ByteArrayInputStream(bytes.toByteArray()))); + } catch (IOException e) { + throw new ParquetDecodingException( + "could not read levels in page for col " + descriptor, e); + } + } + + /** + * Creates a reader for definition and repetition levels, returning an optimized one if + * the levels are not needed. + */ + protected static IntIterator createRLEIterator( + int maxLevel, + BytesInput bytes, + ColumnDescriptor descriptor) throws IOException { + try { + if (maxLevel == 0) { + return new NullIntIterator(); + } + return new RLEIntIterator( + new RunLengthBitPackingHybridDecoder(BytesUtils.getWidthFromMaxInt(maxLevel), + new ByteArrayInputStream(bytes.toByteArray()))); + } catch (IOException e) { + throw new IOException("could not read levels in page for col " + descriptor, e); + } + } + + + /** + * Utility classes to abstract over different way to read ints with different encodings. + * TODO: remove this layer of abstraction? + */ + abstract static class IntIterator { + abstract int nextInt(); + } + + protected static final class ValuesReaderIntIterator extends IntIterator { + ValuesReader delegate; + + public ValuesReaderIntIterator(ValuesReader delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + return delegate.readInteger(); + } + } + + protected static final class RLEIntIterator extends IntIterator { + RunLengthBitPackingHybridDecoder delegate; + + public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + try { + return delegate.readInt(); + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + } + + protected static final class NullIntIterator extends IntIterator { + @Override + int nextInt() { return 0; } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java new file mode 100644 index 0000000..a34d3d3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -0,0 +1,479 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.AbstractParquetRecordReader; +import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; + +public class VectorizedParquetRecordReader extends AbstractParquetRecordReader + implements RecordReader { + public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class); + + private List colsToInclude; + + protected MessageType fileSchema; + protected MessageType requestedSchema; + List columnNamesList; + List columnTypesList; + + private VectorizedRowBatchCtx rbCtx; + + /** + * For each request column, the reader to read this column. This is NULL if this column + * is missing from the file, in which case we populate the attribute with NULL. + */ + private VectorizedColumnReader[] columnReaders; + + /** + * The number of rows that have been returned. + */ + private long rowsReturned; + + /** + * The number of rows that have been reading, including the current in flight row group. + */ + private long totalCountLoadedSoFar = 0; + + /** + * The total number of rows this RecordReader will eventually read. The sum of the + * rows of all the row groups. + */ + protected long totalRowCount; + + public VectorizedParquetRecordReader( + org.apache.hadoop.mapred.InputSplit oldInputSplit, + JobConf conf) { + try { + serDeStats = new SerDeStats(); + projectionPusher = new ProjectionPusher(); + initialize(oldInputSplit, conf); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + } catch (Throwable e) { + LOG.error("Failed to create the vectorized reader due to exception " + e); + throw new RuntimeException(e); + } + } + + public VectorizedParquetRecordReader( + InputSplit inputSplit, + JobConf conf) { + try { + serDeStats = new SerDeStats(); + projectionPusher = new ProjectionPusher(); + initialize(inputSplit, conf); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); + rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(createStructObjectInspector(), new String[0]); + } catch (Throwable e) { + LOG.error("Failed to create the vectorized reader due to exception " + e); + throw new RuntimeException(e); + } + } + + private StructObjectInspector createStructObjectInspector() { + // Create row related objects + TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + } + + + public void initialize( + org.apache.hadoop.mapred.InputSplit oldInputSplit, + JobConf configuration) throws IOException, InterruptedException { + initialize(getSplit(oldInputSplit, configuration), configuration); + } + + public void initialize( + InputSplit oldSplit, + JobConf configuration) throws IOException, InterruptedException { + jobConf = configuration; + ParquetMetadata footer; + List blocks; + ParquetInputSplit split = (ParquetInputSplit) oldSplit; + boolean indexAccess = + configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false); + this.file = split.getPath(); + long[] rowGroupOffsets = split.getRowGroupOffsets(); + + String columnNames = configuration.get(IOConstants.COLUMNS); + columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); + columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + + // if task.side.metadata is set, rowGroupOffsets is null + if (rowGroupOffsets == null) { + // then we need to apply the predicate push down filter + footer = readFooter(configuration, file, range(split.getStart(), split.getEnd())); + MessageType fileSchema = footer.getFileMetaData().getSchema(); + FilterCompat.Filter filter = getFilter(configuration); + blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); + } else { + // otherwise we find the row groups that were selected on the client + footer = readFooter(configuration, file, NO_FILTER); + Set offsets = new HashSet<>(); + for (long offset : rowGroupOffsets) { + offsets.add(offset); + } + blocks = new ArrayList<>(); + for (BlockMetaData block : footer.getBlocks()) { + if (offsets.contains(block.getStartingPos())) { + blocks.add(block); + } + } + // verify we found them all + if (blocks.size() != rowGroupOffsets.length) { + long[] foundRowGroupOffsets = new long[footer.getBlocks().size()]; + for (int i = 0; i < foundRowGroupOffsets.length; i++) { + foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos(); + } + // this should never happen. + // provide a good error message in case there's a bug + throw new IllegalStateException( + "All the offsets listed in the split should be found in the file." + + " expected: " + Arrays.toString(rowGroupOffsets) + + " found: " + blocks + + " out of: " + Arrays.toString(foundRowGroupOffsets) + + " in range " + split.getStart() + ", " + split.getEnd()); + } + } + + for (BlockMetaData block : blocks) { + this.totalRowCount += block.getRowCount(); + } + this.fileSchema = footer.getFileMetaData().getSchema(); + + + MessageType tableSchema; + if (indexAccess) { + List indexSequence = new ArrayList(); + + // Generates a sequence list of indexes + for(int i = 0; i < columnNamesList.size(); i++) { + indexSequence.add(i); + } + + tableSchema = getSchemaByIndex(fileSchema, columnNamesList, indexSequence); + } else { + + tableSchema = getSchemaByName(fileSchema, columnNamesList, columnTypesList); + } +// this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + + List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { + requestedSchema = getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); + }else{ + requestedSchema = fileSchema; + } + + this.reader = new ParquetFileReader( + configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); + } + + /** + * Searchs column names by name on a given Parquet schema, and returns its corresponded + * Parquet schema types. + * + * @param schema Group schema where to search for column names. + * @param colNames List of column names. + * @param colTypes List of column types. + * @return List of GroupType objects of projected columns. + */ + private static List getProjectedGroupFields(GroupType schema, List colNames, List colTypes) { + List schemaTypes = new ArrayList(); + + ListIterator columnIterator = colNames.listIterator(); + while (columnIterator.hasNext()) { + TypeInfo colType = colTypes.get(columnIterator.nextIndex()); + String colName = columnIterator.next(); + + Type fieldType = getFieldTypeIgnoreCase(schema, colName); + if (fieldType == null) { + schemaTypes.add(Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(colName)); + } else { + schemaTypes.add(getProjectedType(colType, fieldType)); + } + } + + return schemaTypes; + } + + private static Type getProjectedType(TypeInfo colType, Type fieldType) { + switch (colType.getCategory()) { + case STRUCT: + List groupFields = getProjectedGroupFields( + fieldType.asGroupType(), + ((StructTypeInfo) colType).getAllStructFieldNames(), + ((StructTypeInfo) colType).getAllStructFieldTypeInfos() + ); + + Type[] typesArray = groupFields.toArray(new Type[0]); + return Types.buildGroup(fieldType.getRepetition()) + .addFields(typesArray) + .named(fieldType.getName()); + case LIST: + TypeInfo elemType = ((ListTypeInfo) colType).getListElementTypeInfo(); + if (elemType.getCategory() == ObjectInspector.Category.STRUCT) { + Type subFieldType = fieldType.asGroupType().getType(0); + if (!subFieldType.isPrimitive()) { + String subFieldName = subFieldType.getName(); + Text name = new Text(subFieldName); + if (name.equals(ParquetHiveSerDe.ARRAY) || name.equals(ParquetHiveSerDe.LIST)) { + subFieldType = new GroupType(Type.Repetition.REPEATED, subFieldName, + getProjectedType(elemType, subFieldType.asGroupType().getType(0))); + } else { + subFieldType = getProjectedType(elemType, subFieldType); + } + return Types.buildGroup(Type.Repetition.OPTIONAL).as(OriginalType.LIST).addFields( + subFieldType).named(fieldType.getName()); + } + } + break; + default: + } + return fieldType; + } + + + /** + * Searchs for a fieldName into a parquet GroupType by ignoring string case. + * GroupType#getType(String fieldName) is case sensitive, so we use this method. + * + * @param groupType Group of field types where to search for fieldName + * @param fieldName The field what we are searching + * @return The Type object of the field found; null otherwise. + */ + private static Type getFieldTypeIgnoreCase(GroupType groupType, String fieldName) { + for (Type type : groupType.getFields()) { + if (type.getName().equalsIgnoreCase(fieldName)) { + return type; + } + } + + return null; + } + + + /** + * Searchs column names by name on a given Parquet message schema, and returns its projected + * Parquet schema types. + * + * @param schema Message type schema where to search for column names. + * @param colNames List of column names. + * @param colTypes List of column types. + * @return A MessageType object of projected columns. + */ + private static MessageType getSchemaByName(MessageType schema, List colNames, List colTypes) { + List projectedFields = getProjectedGroupFields(schema, colNames, colTypes); + Type[] typesArray = projectedFields.toArray(new Type[0]); + + return Types.buildMessage() + .addFields(typesArray) + .named(schema.getName()); + } + + /** + * Searchs column names by index on a given Parquet file schema, and returns its corresponded + * Parquet schema types. + * + * @param schema Message schema where to search for column names. + * @param colNames List of column names. + * @param colIndexes List of column indexes. + * @return A MessageType object of the column names found. + */ + private static MessageType getSchemaByIndex(MessageType schema, List colNames, List colIndexes) { + List schemaTypes = new ArrayList(); + + for (Integer i : colIndexes) { + if (i < colNames.size()) { + if (i < schema.getFieldCount()) { + schemaTypes.add(schema.getType(i)); + } else { + //prefixing with '_mask_' to ensure no conflict with named + //columns in the file schema + schemaTypes.add(Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("_mask_" + colNames.get(i))); + } + } + } + return new MessageType(schema.getName(), schemaTypes); + } + + /** + * columnBatch object that is used for batch decoding. This is created on first use and triggers + * batched decoding. It is not valid to interleave calls to the batched interface with the row + * by row RecordReader APIs. + * This is only enabled with additional flags for development. This is still a work in progress + * and currently unsupported cases will fail with potentially difficult to diagnose errors. + * This should be only turned on for development to work on this feature. + * + * When this is set, the code will branch early on in the RecordReader APIs. There is no shared + * code between the path that uses the MR decoders and the vectorized ones. + * + * TODOs: + * - Implement v2 page formats (just make sure we create the correct decoders). + */ + private VectorizedRowBatch columnarBatch; + + @Override + public boolean next( + NullWritable nullWritable, + VectorizedRowBatch vectorizedRowBatch) throws IOException { + columnarBatch = vectorizedRowBatch; + return nextBatch(); + } + + /** + * Advances to the next batch of rows. Returns false if there are no more. + */ + private boolean nextBatch() throws IOException { + initRowBatch(); + columnarBatch.reset(); + if (rowsReturned >= totalRowCount) { + return false; + } + checkEndOfRowGroup(); + + int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, totalCountLoadedSoFar - rowsReturned); + for (int i = 0; i < columnReaders.length; ++i) { + if (columnReaders[i] == null) { + continue; + } + columnarBatch.cols[colsToInclude.get(i)].isRepeating = true; + columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)], + columnTypesList.get(colsToInclude.get(i))); + } + rowsReturned += num; + columnarBatch.size = num; + return true; + } + + private void initRowBatch() { + if (columnarBatch == null) { + if (rbCtx != null) { + columnarBatch = rbCtx.createVectorizedRowBatch(); + } else { + // test only + rbCtx = new VectorizedRowBatchCtx(); + try { + rbCtx.init(createStructObjectInspector(), new String[0]); + } catch (HiveException e) { + e.printStackTrace(); + } + } + } + } + + private void checkEndOfRowGroup() throws IOException { + if (rowsReturned != totalCountLoadedSoFar) return; + PageReadStore pages = reader.readNextRowGroup(); + if (pages == null) { + throw new IOException("expecting more rows but reached last block. Read " + + rowsReturned + " out of " + totalRowCount); + } + List columns = requestedSchema.getColumns(); + List types = requestedSchema.getFields(); + columnReaders = new VectorizedColumnReader[columns.size()]; + for (int i = 0; i < columns.size(); ++i) { + columnReaders[i] = + new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)), + skipTimestampConversion, types.get(i)); + } + totalCountLoadedSoFar += pages.getRowCount(); + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + initRowBatch(); + return columnarBatch; + } + + @Override + public long getPos() throws IOException { + //TODO + return 0; + } + + @Override + public void close() throws IOException { + if (columnarBatch != null) { + columnarBatch = null; + } + } + + @Override + public float getProgress() throws IOException { + //TODO + return 0; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPlainValuesReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPlainValuesReader.java new file mode 100644 index 0000000..3025455 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPlainValuesReader.java @@ -0,0 +1,227 @@ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import jodd.util.ArraysUtil; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.parquet.bytes.*; +import org.apache.parquet.bytes.LittleEndianDataInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; +import org.apache.parquet.io.api.Binary; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; + +public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader{ + protected org.apache.parquet.bytes.LittleEndianDataInputStream in; + private ByteBitPackingValuesReader booleanStream; + private byte[] buffer; + private int offset; + + @Override + public void initFromPage( + int valueCount, + byte[] bytes, + int offset) throws IOException { + this.buffer = bytes; + this.in = new LittleEndianDataInputStream( + new ByteArrayInputStream(bytes, offset, bytes.length - offset)); + booleanStream = new ByteBitPackingValuesReader(1, + LITTLE_ENDIAN); + this.offset = offset; + booleanStream.initFromPage(valueCount, bytes, offset); + } + + @Override + public void skip() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean readBoolean() { + return booleanStream.readInteger() == 0 ? false : true; + } + + @Override + public int readInteger(){ + try { + return in.readInt(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public long readLong(){ + try { + return in.readLong(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public float readFloat(){ + try { + return in.readFloat(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public double readDouble(){ + try { + return in.readDouble(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void readLongs( + int total, + LongColumnVector c, + int rowId) { + try { + for (int i = 0; i < total; i++) { + c.vector[rowId + i] = in.readLong(); + if (c.isRepeating) { + c.isRepeating = (c.vector[0] == c.vector[rowId + i]); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void readFloats( + int total, + DoubleColumnVector c, + int rowId) { + try { + for (int i = 0; i < total; i++) { + c.vector[rowId + i] = in.readFloat(); + if (c.isRepeating) { + c.isRepeating = (c.vector[0] == c.vector[rowId + i]); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void readDoubles( + int total, + DoubleColumnVector c, + int rowId) { + try { + for (int i = 0; i < total; i++) { + c.vector[rowId + i] = in.readDouble(); + if (c.isRepeating) { + c.isRepeating = (c.vector[0] == c.vector[rowId + i]); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void readBinarys( + int total, + BytesColumnVector c, + int rowId) { + for (int i = 0; i < total; i++) { + try { + byte[] data = readBytes().getBytesUnsafe(); + c.setVal(rowId + i, data); + } catch (Throwable e) { + throw new RuntimeException("total " + total + " i " + i, e); + } + if (c.isRepeating) { + c.isRepeating = Arrays.equals(ArraysUtil.subarray(c.vector[0], c.start[0], c.length[0]), + ArraysUtil.subarray(c.vector[rowId + i], c.start[0], c.length[0])); + } + } + } + + @Override + public byte readByte() { + try { + return in.readByte(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Binary readBytes() { + int length = 0; + try { + length = BytesUtils.readIntLittleEndian(buffer, offset); + } catch (Throwable e) { + throw new RuntimeException("Failed at " + length + " Buffer size " + buffer.length + " " + + "offset " + offset, e); + } + int start = offset + 4; + offset = start + length; + return Binary.fromConstantByteArray(buffer, start, length); + } + + @Override + public void readBooleans( + int total, + LongColumnVector c, + int rowId) { + for (int i = 0; i < total; i++) { + c.vector[rowId + i] = readBoolean() ? 1 : 0; + if (c.isRepeating) { + c.isRepeating = (c.vector[0] == c.vector[rowId + i]); + } + } + } + + @Override + public void readBytes( + int total, + LongColumnVector c, + int rowId) { + try { + for (int i = 0; i < total; i++) { + byte value = in.readByte(); + if (c.isRepeating) { + c.isRepeating = (c.vector[0] == value); + } + c.vector[rowId + i] = value; + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void readIntegers( + int total, + LongColumnVector c, + int rowId) { + try { + for (int i = 0; i < total; i++) { + int value = in.readInt(); + if (c.isRepeating) { + c.isRepeating = (c.vector[0] == value); + } + c.vector[rowId + i] = value; + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedRleValuesReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedRleValuesReader.java new file mode 100644 index 0000000..e6ebc4d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedRleValuesReader.java @@ -0,0 +1,353 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.api.Binary; + +import java.io.IOException; +import java.util.Arrays; + +public class VectorizedRleValuesReader extends ValuesReader implements VectorizedValuesReader { + + + // Current decoding mode. The encoded data contains groups of either run length encoded data + // (RLE) or bit packed data. Each group contains a header that indicates which group it is and + // the number of values in the group. + // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md + private enum MODE { + RLE, + PACKED + } + + // Encoded data. + private byte[] in; + private int end; + private int offset; + + // bit/byte width of decoded data and utility to batch unpack them. + private int bitWidth; + private int bytesWidth; + private BytePacker packer; + + // Current decoding mode and values + private MODE mode; + private int currentCount; + private int currentValue; + + private int definitionLevel; + + private VectorizedColumnReader.IntIterator repetitionLevelColumn; + private VectorizedColumnReader.IntIterator definitionLevelColumn; + + // Buffer of decoded values if the values are PACKED. + private int[] currentBuffer = new int[16]; + private int currentBufferIdx = 0; + + // If true, the bit width is fixed. This decoder is used in different places and this also + // controls if we need to read the bitwidth from the beginning of the data stream. + private final boolean fixedWidth; + + public VectorizedRleValuesReader() { + fixedWidth = false; + } + + public VectorizedRleValuesReader( + int bitWidth, + VectorizedColumnReader.IntIterator definitionLevelColumn, + VectorizedColumnReader.IntIterator repetitionLevelColumn) { + fixedWidth = true; + this.definitionLevelColumn = definitionLevelColumn; + this.repetitionLevelColumn = repetitionLevelColumn; +// readRepetitionAndDefinitionLevels(); + init(bitWidth); + } + + private void readRepetitionAndDefinitionLevels() { +// repetitionLevel = repetitionLevelColumn.nextInt(); + definitionLevel = definitionLevelColumn.nextInt(); + } + + @Override + public int getNextOffset() { + return this.end; + } + + @Override + public void initFromPage( + int valueCount, + byte[] page, + int start) throws IOException { + this.offset = start; + this.in = page; + if (fixedWidth) { + if (bitWidth != 0) { + int length = readIntLittleEndian(); + this.end = this.offset + length; + } + } else { + this.end = page.length; + if (this.end != this.offset) init(page[this.offset++] & 255); + } + if (bitWidth == 0) { + // 0 bit width, treat this as an RLE run of valueCount number of 0's. + this.mode = MODE.RLE; + this.currentCount = valueCount; + this.currentValue = 0; + } else { + this.currentCount = 0; + } + } + + /** + * Initializes the internal state for decoding ints of `bitWidth`. + */ + private void init(int bitWidth) { + Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); + this.bitWidth = bitWidth; + this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth); + this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + } + + @Override + public void skip() { + this.readInteger(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException("only readInts is valid."); + } + + @Override + public int readInteger() { + if (this.currentCount == 0) { this.readNextGroup(); } + + this.currentCount--; + switch (mode) { + case RLE: + return this.currentValue; + case PACKED: + return this.currentBuffer[currentBufferIdx++]; + } + throw new RuntimeException("Unreachable"); + } + + @Override + public Binary readBytes() { + throw new UnsupportedOperationException("only readInts is valid."); + } + + @Override + public void readBooleans( + int total, + LongColumnVector c, + int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + + @Override + public void readBytes( + int total, + LongColumnVector c, + int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + + // The RLE reader implements the vectorized decoding interface when used to decode dictionary + // IDs. This is different than the above APIs that decodes definitions levels along with values. + // Since this is only used to decode dictionary IDs, only decoding integers is supported. + @Override + public void readLongs( + int total, + LongColumnVector c, + int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + + @Override + public void readFloats( + int total, + DoubleColumnVector c, + int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + + @Override + public void readDoubles( + int total, + DoubleColumnVector c, + int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + + @Override + public void readBinarys( + int total, + BytesColumnVector c, + int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + + @Override + public void readIntegers( + int total, + LongColumnVector c, + int rowId) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) + this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + Arrays.fill(c.vector, rowId, rowId + n, currentValue); + if (c.isRepeating) { + c.isRepeating = (c.vector[0] == currentValue); + } + break; + case PACKED: + for (int i = 0; i < n; i++) { + c.vector[rowId + i] = currentBuffer[currentBufferIdx + i]; + } + int i = 0; + while (c.isRepeating && i < n) { + c.isRepeating = (c.vector[0] == currentBuffer[i]); + i++; + } + currentBufferIdx += n; + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + + // Initialize the reader from a buffer. This is used for the V2 page encoding where the + // definition are in its own buffer. + public void initFromBuffer(int valueCount, byte[] data) { + this.offset = 0; + this.in = data; + this.end = data.length; + if (bitWidth == 0) { + // 0 bit width, treat this as an RLE run of valueCount number of 0's. + this.mode = MODE.RLE; + this.currentCount = valueCount; + this.currentValue = 0; + } else { + this.currentCount = 0; + } + } + + /** + * Reads the next varint encoded int. + */ + private int readUnsignedVarInt() { + int value = 0; + int shift = 0; + int b; + do { + b = in[offset++] & 255; + value |= (b & 0x7F) << shift; + shift += 7; + } while ((b & 0x80) != 0); + return value; + } + + /** + * Reads the next 4 byte little endian int. + */ + private int readIntLittleEndian() { + int ch4 = in[offset] & 255; + int ch3 = in[offset + 1] & 255; + int ch2 = in[offset + 2] & 255; + int ch1 = in[offset + 3] & 255; + offset += 4; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + /** + * Reads the next byteWidth little endian int. + */ + private int readIntLittleEndianPaddedOnBitWidth() { + switch (bytesWidth) { + case 0: + return 0; + case 1: + return in[offset++] & 255; + case 2: { + int ch2 = in[offset] & 255; + int ch1 = in[offset + 1] & 255; + offset += 2; + return (ch1 << 8) + ch2; + } + case 3: { + int ch3 = in[offset] & 255; + int ch2 = in[offset + 1] & 255; + int ch1 = in[offset + 2] & 255; + offset += 3; + return (ch1 << 16) + (ch2 << 8) + (ch3 << 0); + } + case 4: { + return readIntLittleEndian(); + } + } + throw new RuntimeException("Unreachable"); + } + + private int ceil8(int value) { + return (value + 7) / 8; + } + + /** + * Reads the next group. + */ + private void readNextGroup() { + int header = readUnsignedVarInt(); + this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; + switch (mode) { + case RLE: + this.currentCount = header >>> 1; + this.currentValue = readIntLittleEndianPaddedOnBitWidth(); + return; + case PACKED: + int numGroups = header >>> 1; + this.currentCount = numGroups * 8; + int bytesToRead = ceil8(this.currentCount * this.bitWidth); + + if (this.currentBuffer.length < this.currentCount) { + this.currentBuffer = new int[this.currentCount]; + } + currentBufferIdx = 0; + int valueIndex = 0; + for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) { + this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex); + valueIndex += 8; + } + offset += bytesToRead; + return; + default: + throw new ParquetDecodingException("not a valid mode " + this.mode); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedValuesReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedValuesReader.java new file mode 100644 index 0000000..e4b6edc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedValuesReader.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.parquet.io.api.Binary; + +public interface VectorizedValuesReader { + boolean readBoolean(); + byte readByte(); + int readInteger(); + long readLong(); + float readFloat(); + double readDouble(); + Binary readBytes(); + + /* + * Reads `total` values into `c` start at `c[rowId]` + */ + void readBooleans(int total, LongColumnVector c, int rowId); + void readBytes(int total, LongColumnVector c, int rowId); + void readIntegers(int total, LongColumnVector c, int rowId); + void readLongs(int total, LongColumnVector c, int rowId); + void readFloats(int total, DoubleColumnVector c, int rowId); + void readDoubles(int total, DoubleColumnVector c, int rowId); + void readBinarys(int total, BytesColumnVector c, int rowId); +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java new file mode 100644 index 0000000..55b4959 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -0,0 +1,383 @@ +package org.apache.hadoop.hive.ql.io.parquet; + +import junit.framework.Assert; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertFalse; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; + +public class TestVectorizedColumnReader { + + private static final int nElements = 2500; + protected static final Configuration conf = new Configuration(); + protected static final Path file = + new Path("target/test/TestParquetVectorReader/testParquetFile"); + private static String[] uniqueStrs = new String[nElements]; + private static boolean[] isNulls = new boolean[nElements]; + private static Random random = new Random(); + protected static final MessageType schema = parseMessageType( + "message test { " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required int96 int96_field; " + + "required double double_field; " + + "required float float_field; " + + "required boolean boolean_field; " + + "required fixed_len_byte_array(3) flba_field; " + + "optional fixed_len_byte_array(1) some_null_field; " + + "optional fixed_len_byte_array(1) all_null_field; " + + "optional binary binary_field; " + + "optional binary binary_field_non_repeating; " + + "} "); + + @AfterClass + public static void cleanup() throws IOException { + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + fs.delete(file, true); + } + } + + @BeforeClass + public static void prepareFile() throws IOException { + cleanup(); + + boolean dictionaryEnabled = true; + boolean validating = false; + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter writer = new ParquetWriter( + file, + new GroupWriteSupport(), + GZIP, 1024*1024, 1024, 1024*1024, + dictionaryEnabled, validating, PARQUET_1_0, conf); + writeData(f, writer); + } + + protected static void writeData(SimpleGroupFactory f, ParquetWriter writer) throws IOException { + initialStrings(uniqueStrs); + for (int i = 0; i < nElements; i++) { + Group group = f.newGroup() + .append("int32_field", i) + .append("int64_field", (long) 2 * i) + .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes())) + .append("double_field", i * 1.0) + .append("float_field", ((float) (i * 2.0))) + .append("boolean_field", i % 5 == 0) + .append("flba_field", "abc"); + + if (i % 2 == 1) { + group.append("some_null_field", "x"); + } + + if (i % 13 != 1) { + int binaryLen = i % 10; + group.append("binary_field", + Binary.fromString(new String(new char[binaryLen]).replace("\0", "x"))); + } + + if (uniqueStrs[i] != null) { + group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i])); + } + writer.write(group); + } + writer.close(); + } + + private static String getRandomStr() { + int len = random.nextInt(10); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < len; i++) { + sb.append((char) ('a' + random.nextInt(25))); + } + return sb.toString(); + } + + public static void initialStrings(String[] uniqueStrs) { + int nulls = 0; + for (int i = 0; i < uniqueStrs.length; i++) { + String str = getRandomStr(); + if (!str.isEmpty()) { + uniqueStrs[i] = str; + isNulls[i] = false; + }else{ + isNulls[i] = true; + } + } + } + + private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) + throws IOException, InterruptedException { + conf.set(PARQUET_READ_SCHEMA, schemaString); + Job vectorJob = new Job(conf, "read vector"); + ParquetInputFormat.setInputPaths(vectorJob, file); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); + InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + return new VectorizedParquetRecordReader(split, new JobConf(conf)); + } + + @Test + public void testIntRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int32_field"); + conf.set(IOConstants.COLUMNS_TYPES,"int"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int32 int32_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(c, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testLongRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int64_field"); + conf.set(IOConstants.COLUMNS_TYPES, "bigint"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int64 int64_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(2 * c, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testDoubleRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"double_field"); + conf.set(IOConstants.COLUMNS_TYPES, "double"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required double double_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(1.0 * c, vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testFloatRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"float_field"); + conf.set(IOConstants.COLUMNS_TYPES, "float"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required float float_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals((float)2.0 * c, vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBooleanRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"boolean_field"); + conf.set(IOConstants.COLUMNS_TYPES, "boolean"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required boolean boolean_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + int e = (c % 5 == 0) ? 1 : 0; + assertEquals(e, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBinaryReadDictionaryEncoding() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"binary_field"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + if (c % 13 == 1) { + assertTrue(vector.isNull[i]); + } else { + assertFalse(vector.isNull[i]); + int binaryLen = c % 10; + String expected = new String(new char[binaryLen]).replace("\0", "x"); + String actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("Failed at " + c, expected, actual); + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBinaryRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"binary_field_non_repeating"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field_non_repeating;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + String actual; + assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]); + if (!vector.isNull[i]) { + actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("failed at " + c, uniqueStrs[c], actual); + }else{ + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + + } +} diff --git ql/src/test/queries/clientpositive/parquet_types_vectorization.q ql/src/test/queries/clientpositive/parquet_types_vectorization.q new file mode 100644 index 0000000..d877b00 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_types_vectorization.q @@ -0,0 +1,84 @@ +set hive.mapred.mode=nonstrict; +DROP TABLE parquet_types_staging; +DROP TABLE parquet_types; + +set hive.vectorized.execution.enabled=true; +set hive.vectorized.execution.reduce.enabled=true; +set hive.vectorized.use.row.serde.deserialize=true; +set hive.vectorized.use.vector.serde.deserialize=true; +set hive.vectorized.execution.reduce.groupby.enabled = true; + +CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':'; + +CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET; + +LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging; + +SELECT * FROM parquet_types_staging; + +INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging; + +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +hex(cbinary), m1, l1, st1, d FROM parquet_types; + +SELECT cchar, LENGTH(cchar), cvarchar, LENGTH(cvarchar) FROM parquet_types; + +-- test types in group by + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +; + +SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat; + +SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar; + +SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar; + +SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; + +SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t; + +SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; \ No newline at end of file diff --git ql/src/test/results/clientpositive/parquet_types_vectorization.q.out ql/src/test/results/clientpositive/parquet_types_vectorization.q.out new file mode 100644 index 0000000..ad743ef --- /dev/null +++ ql/src/test/results/clientpositive/parquet_types_vectorization.q.out @@ -0,0 +1,379 @@ +PREHOOK: query: DROP TABLE parquet_types_staging +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_types_staging +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE parquet_types +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_types +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_types_staging +POSTHOOK: query: CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_types_staging +PREHOOK: query: CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_types +POSTHOOK: query: CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_types +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@parquet_types_staging +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@parquet_types_staging +PREHOOK: query: SELECT * FROM parquet_types_staging +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types_staging +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_types_staging +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types_staging +#### A masked pattern was here #### +100 1 1 1.0 0.0 abc 2011-01-01 01:01:01.111111111 a a B4F3CAFDBEDD {"k1":"v1"} [101,200] {"c1":10,"c2":"a"} 2011-01-01 +101 2 2 1.1 0.3 def 2012-02-02 02:02:02.222222222 ab ab 68692CCAC0BDE7 {"k2":"v2"} [102,200] {"c1":10,"c2":"d"} 2012-02-02 +102 3 3 1.2 0.6 ghi 2013-03-03 03:03:03.333333333 abc abc B4F3CAFDBEDD {"k3":"v3"} [103,200] {"c1":10,"c2":"g"} 2013-03-03 +103 1 4 1.3 0.9 jkl 2014-04-04 04:04:04.444444444 abcd abcd 68692CCAC0BDE7 {"k4":"v4"} [104,200] {"c1":10,"c2":"j"} 2014-04-04 +104 2 5 1.4 1.2 mno 2015-05-05 05:05:05.555555555 abcde abcde B4F3CAFDBEDD {"k5":"v5"} [105,200] {"c1":10,"c2":"m"} 2015-05-05 +105 3 1 1.0 1.5 pqr 2016-06-06 06:06:06.666666666 abcde abcdef 68692CCAC0BDE7 {"k6":"v6"} [106,200] {"c1":10,"c2":"p"} 2016-06-06 +106 1 2 1.1 1.8 stu 2017-07-07 07:07:07.777777777 abcde abcdefg B4F3CAFDBEDD {"k7":"v7"} [107,200] {"c1":10,"c2":"s"} 2017-07-07 +107 2 3 1.2 2.1 vwx 2018-08-08 08:08:08.888888888 bcdef abcdefgh 68692CCAC0BDE7 {"k8":"v8"} [108,200] {"c1":10,"c2":"v"} 2018-08-08 +108 3 4 1.3 2.4 yza 2019-09-09 09:09:09.999999999 cdefg B4F3CAFDBE 68656C6C6F {"k9":"v9"} [109,200] {"c1":10,"c2":"y"} 2019-09-09 +109 1 5 1.4 2.7 bcd 2020-10-10 10:10:10.101010101 klmno abcdedef 68692CCAC0BDE7 {"k10":"v10"} [110,200] {"c1":10,"c2":"b"} 2020-10-10 +110 2 1 1.0 3.0 efg 2021-11-11 11:11:11.111111111 pqrst abcdede B4F3CAFDBEDD {"k11":"v11"} [111,200] {"c1":10,"c2":"e"} 2021-11-11 +111 3 2 1.1 3.3 hij 2022-12-12 12:12:12.121212121 nopqr abcded 68692CCAC0BDE7 {"k12":"v12"} [112,200] {"c1":10,"c2":"h"} 2022-12-12 +112 1 3 1.2 3.6 klm 2023-01-02 13:13:13.131313131 opqrs abcdd B4F3CAFDBEDD {"k13":"v13"} [113,200] {"c1":10,"c2":"k"} 2023-01-02 +113 2 4 1.3 3.9 nop 2024-02-02 14:14:14.141414141 pqrst abc 68692CCAC0BDE7 {"k14":"v14"} [114,200] {"c1":10,"c2":"n"} 2024-02-02 +114 3 5 1.4 4.2 qrs 2025-03-03 15:15:15.151515151 qrstu b B4F3CAFDBEDD {"k15":"v15"} [115,200] {"c1":10,"c2":"q"} 2025-03-03 +115 1 1 1.0 4.5 qrs 2026-04-04 16:16:16.161616161 rstuv abcded 68692CCAC0BDE7 {"k16":"v16"} [116,200] {"c1":10,"c2":"q"} 2026-04-04 +116 2 2 1.1 4.8 wxy 2027-05-05 17:17:17.171717171 stuvw abcded B4F3CAFDBEDD {"k17":"v17"} [117,200] {"c1":10,"c2":"w"} 2027-05-05 +117 3 3 1.2 5.1 zab 2028-06-06 18:18:18.181818181 tuvwx abcded 68692CCAC0BDE7 {"k18":"v18"} [118,200] {"c1":10,"c2":"z"} 2028-06-06 +118 1 4 1.3 5.4 cde 2029-07-07 19:19:19.191919191 uvwzy abcdede B4F3CAFDBEDD {"k19":"v19"} [119,200] {"c1":10,"c2":"c"} 2029-07-07 +119 2 5 1.4 5.7 fgh 2030-08-08 20:20:20.202020202 vwxyz abcdede 68692CCAC0BDE7 {"k20":"v20"} [120,200] {"c1":10,"c2":"f"} 2030-08-08 +120 3 1 1.0 6.0 ijk 2031-09-09 21:21:21.212121212 wxyza abcde B4F3CAFDBEDD {"k21":"v21"} [121,200] {"c1":10,"c2":"i"} 2031-09-09 +121 1 2 1.1 6.3 lmn 2032-10-10 22:22:22.222222222 bcdef abcde {"k22":"v22"} [122,200] {"c1":10,"c2":"l"} 2032-10-10 +PREHOOK: query: INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types_staging +PREHOOK: Output: default@parquet_types +POSTHOOK: query: INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types_staging +POSTHOOK: Output: default@parquet_types +POSTHOOK: Lineage: parquet_types.cbinary EXPRESSION [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cbinary, type:string, comment:null), ] +POSTHOOK: Lineage: parquet_types.cchar SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cchar, type:char(5), comment:null), ] +POSTHOOK: Lineage: parquet_types.cdouble SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: parquet_types.cfloat SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: parquet_types.cint SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_types.csmallint SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: parquet_types.cstring1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: parquet_types.ctinyint SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] +POSTHOOK: Lineage: parquet_types.cvarchar SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cvarchar, type:varchar(10), comment:null), ] +POSTHOOK: Lineage: parquet_types.d SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:d, type:date, comment:null), ] +POSTHOOK: Lineage: parquet_types.l1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:l1, type:array, comment:null), ] +POSTHOOK: Lineage: parquet_types.m1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:m1, type:map, comment:null), ] +POSTHOOK: Lineage: parquet_types.st1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:st1, type:struct, comment:null), ] +POSTHOOK: Lineage: parquet_types.t SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:t, type:timestamp, comment:null), ] +PREHOOK: query: SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +hex(cbinary), m1, l1, st1, d FROM parquet_types +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +hex(cbinary), m1, l1, st1, d FROM parquet_types +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +100 1 1 1.0 0.0 abc 2011-01-01 01:01:01.111111111 a a B4F3CAFDBEDD {"k1":"v1"} [101,200] {"c1":10,"c2":"a"} 2011-01-01 +101 2 2 1.1 0.3 def 2012-02-02 02:02:02.222222222 ab ab 68692CCAC0BDE7 {"k2":"v2"} [102,200] {"c1":10,"c2":"d"} 2012-02-02 +102 3 3 1.2 0.6 ghi 2013-03-03 03:03:03.333333333 abc abc B4F3CAFDBEDD {"k3":"v3"} [103,200] {"c1":10,"c2":"g"} 2013-03-03 +103 1 4 1.3 0.9 jkl 2014-04-04 04:04:04.444444444 abcd abcd 68692CCAC0BDE7 {"k4":"v4"} [104,200] {"c1":10,"c2":"j"} 2014-04-04 +104 2 5 1.4 1.2 mno 2015-05-05 05:05:05.555555555 abcde abcde B4F3CAFDBEDD {"k5":"v5"} [105,200] {"c1":10,"c2":"m"} 2015-05-05 +105 3 1 1.0 1.5 pqr 2016-06-06 06:06:06.666666666 abcde abcdef 68692CCAC0BDE7 {"k6":"v6"} [106,200] {"c1":10,"c2":"p"} 2016-06-06 +106 1 2 1.1 1.8 stu 2017-07-07 07:07:07.777777777 abcde abcdefg B4F3CAFDBEDD {"k7":"v7"} [107,200] {"c1":10,"c2":"s"} 2017-07-07 +107 2 3 1.2 2.1 vwx 2018-08-08 08:08:08.888888888 bcdef abcdefgh 68692CCAC0BDE7 {"k8":"v8"} [108,200] {"c1":10,"c2":"v"} 2018-08-08 +108 3 4 1.3 2.4 yza 2019-09-09 09:09:09.999999999 cdefg B4F3CAFDBE 68656C6C6F {"k9":"v9"} [109,200] {"c1":10,"c2":"y"} 2019-09-09 +109 1 5 1.4 2.7 bcd 2020-10-10 10:10:10.101010101 klmno abcdedef 68692CCAC0BDE7 {"k10":"v10"} [110,200] {"c1":10,"c2":"b"} 2020-10-10 +110 2 1 1.0 3.0 efg 2021-11-11 11:11:11.111111111 pqrst abcdede B4F3CAFDBEDD {"k11":"v11"} [111,200] {"c1":10,"c2":"e"} 2021-11-11 +111 3 2 1.1 3.3 hij 2022-12-12 12:12:12.121212121 nopqr abcded 68692CCAC0BDE7 {"k12":"v12"} [112,200] {"c1":10,"c2":"h"} 2022-12-12 +112 1 3 1.2 3.6 klm 2023-01-02 13:13:13.131313131 opqrs abcdd B4F3CAFDBEDD {"k13":"v13"} [113,200] {"c1":10,"c2":"k"} 2023-01-02 +113 2 4 1.3 3.9 nop 2024-02-02 14:14:14.141414141 pqrst abc 68692CCAC0BDE7 {"k14":"v14"} [114,200] {"c1":10,"c2":"n"} 2024-02-02 +114 3 5 1.4 4.2 qrs 2025-03-03 15:15:15.151515151 qrstu b B4F3CAFDBEDD {"k15":"v15"} [115,200] {"c1":10,"c2":"q"} 2025-03-03 +115 1 1 1.0 4.5 qrs 2026-04-04 16:16:16.161616161 rstuv abcded 68692CCAC0BDE7 {"k16":"v16"} [116,200] {"c1":10,"c2":"q"} 2026-04-04 +116 2 2 1.1 4.8 wxy 2027-05-05 17:17:17.171717171 stuvw abcded B4F3CAFDBEDD {"k17":"v17"} [117,200] {"c1":10,"c2":"w"} 2027-05-05 +117 3 3 1.2 5.1 zab 2028-06-06 18:18:18.181818181 tuvwx abcded 68692CCAC0BDE7 {"k18":"v18"} [118,200] {"c1":10,"c2":"z"} 2028-06-06 +118 1 4 1.3 5.4 cde 2029-07-07 19:19:19.191919191 uvwzy abcdede B4F3CAFDBEDD {"k19":"v19"} [119,200] {"c1":10,"c2":"c"} 2029-07-07 +119 2 5 1.4 5.7 fgh 2030-08-08 20:20:20.202020202 vwxyz abcdede 68692CCAC0BDE7 {"k20":"v20"} [120,200] {"c1":10,"c2":"f"} 2030-08-08 +120 3 1 1.0 6.0 ijk 2031-09-09 21:21:21.212121212 wxyza abcde B4F3CAFDBEDD {"k21":"v21"} [121,200] {"c1":10,"c2":"i"} 2031-09-09 +121 1 2 1.1 6.3 lmn 2032-10-10 22:22:22.222222222 bcdef abcde {"k22":"v22"} [122,200] {"c1":10,"c2":"l"} 2032-10-10 +PREHOOK: query: SELECT cchar, LENGTH(cchar), cvarchar, LENGTH(cvarchar) FROM parquet_types +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cchar, LENGTH(cchar), cvarchar, LENGTH(cvarchar) FROM parquet_types +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +a 1 a 3 +ab 2 ab 3 +abc 3 abc 3 +abcd 4 abcd 4 +abcde 5 abcde 5 +abcde 5 abcdef 6 +abcde 5 abcdefg 7 +bcdef 5 abcdefgh 8 +cdefg 5 B4F3CAFDBE 10 +klmno 5 abcdedef 8 +pqrst 5 abcdede 7 +nopqr 5 abcded 6 +opqrs 5 abcdd 5 +pqrst 5 abc 3 +qrstu 5 b 1 +rstuv 5 abcded 6 +stuvw 5 abcded 6 +tuvwx 5 abcded 6 +uvwzy 5 abcdede 7 +vwxyz 5 abcdede 7 +wxyza 5 abcde 5 +bcdef 5 abcde 5 +PREHOOK: query: -- test types in group by + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: -- test types in group by + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +1 121 1 8 1.175 2.06216 +2 119 1 7 1.21429 1.8 +3 120 1 7 1.17143 1.8 +PREHOOK: query: SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +1.0 5 +1.1 5 +1.2 4 +1.3 4 +1.4 4 +PREHOOK: query: SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +a 1 +ab 1 +abc 1 +abcd 1 +abcde 3 +bcdef 2 +cdefg 1 +klmno 1 +nopqr 1 +opqrs 1 +pqrst 2 +qrstu 1 +rstuv 1 +stuvw 1 +tuvwx 1 +uvwzy 1 +vwxyz 1 +wxyza 1 +PREHOOK: query: SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +B4F3CAFDBE 1 +a 1 +ab 1 +abc 2 +abcd 1 +abcdd 1 +abcde 3 +abcded 4 +abcdede 3 +abcdedef 1 +abcdef 1 +abcdefg 1 +abcdefgh 1 +b 1 +PREHOOK: query: SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +abc 1 +bcd 1 +cde 1 +def 1 +efg 1 +fgh 1 +ghi 1 +hij 1 +ijk 1 +jkl 1 +klm 1 +lmn 1 +mno 1 +nop 1 +pqr 1 +qrs 2 +stu 1 +vwx 1 +wxy 1 +yza 1 +zab 1 +PREHOOK: query: SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +2011-01-01 01:01:01.111111111 1 +2012-02-02 02:02:02.222222222 1 +2013-03-03 03:03:03.333333333 1 +2014-04-04 04:04:04.444444444 1 +2015-05-05 05:05:05.555555555 1 +2016-06-06 06:06:06.666666666 1 +2017-07-07 07:07:07.777777777 1 +2018-08-08 08:08:08.888888888 1 +2019-09-09 09:09:09.999999999 1 +2020-10-10 10:10:10.101010101 1 +2021-11-11 11:11:11.111111111 1 +2022-12-12 12:12:12.121212121 1 +2023-01-02 13:13:13.131313131 1 +2024-02-02 14:14:14.141414141 1 +2025-03-03 15:15:15.151515151 1 +2026-04-04 16:16:16.161616161 1 +2027-05-05 17:17:17.171717171 1 +2028-06-06 18:18:18.181818181 1 +2029-07-07 19:19:19.191919191 1 +2030-08-08 20:20:20.202020202 1 +2031-09-09 21:21:21.212121212 1 +2032-10-10 22:22:22.222222222 1 +PREHOOK: query: SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### + 1 +68656C6C6F 1 +68692CCAC0BDE7 10 +B4F3CAFDBEDD 10