diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/AbstractParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/AbstractParquetRecordReader.java new file mode 100644 index 0000000..b288a21 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/AbstractParquetRecordReader.java @@ -0,0 +1,169 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import com.google.common.base.Strings; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class AbstractParquetRecordReader { + public static final Logger LOG = LoggerFactory.getLogger(AbstractParquetRecordReader.class); + + protected Path file; + protected ProjectionPusher projectionPusher; + protected boolean skipTimestampConversion = false; + protected SerDeStats serDeStats; + protected JobConf jobConf; + + protected int schemaSize; + protected List filtedBlocks; + protected ParquetFileReader reader; + + /** + * gets a ParquetInputSplit corresponding to a split given by Hive + * + * @param oldSplit The split given by Hive + * @param conf The JobConf of the Hive job + * @return a ParquetInputSplit corresponding to the oldSplit + * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file + */ + @SuppressWarnings("deprecation") + protected ParquetInputSplit getSplit( + final org.apache.hadoop.mapred.InputSplit oldSplit, + final JobConf conf + ) throws IOException { + ParquetInputSplit split; + if (oldSplit instanceof FileSplit) { + final Path finalPath = ((FileSplit) oldSplit).getPath(); + jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); + final List blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + final ReadSupport.ReadContext + readContext = new DataWritableReadSupport().init(new InitContext(jobConf, + null, fileMetaData.getSchema())); + + // Compute stats + for (BlockMetaData bmd : blocks) { + serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); + serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); + } + + schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() + .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); + final List splitGroup = new ArrayList(); + final long splitStart = ((FileSplit) oldSplit).getStart(); + final long splitLength = ((FileSplit) oldSplit).getLength(); + for (final BlockMetaData block : blocks) { + final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { + splitGroup.add(block); + } + } + if (splitGroup.isEmpty()) { + LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); + return null; + } + + FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); + if (filter != null) { + filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filtedBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); + return null; + } + + long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); + } + } else { + filtedBlocks = splitGroup; + } + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + } + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + ((FileSplit) oldSplit).getLocations(), + filtedBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; + } else { + throw new IllegalArgumentException("Unknown split type: " + oldSplit); + } + } + + public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); + if (sarg == null) { + return null; + } + + // Create the Parquet FilterPredicate without including columns that do not exist + // on the shema (such as partition columns). + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema); + if (p != null) { + // Filter may have sensitive information. Do not send to debug. + LOG.debug("PARQUET predicate push down generated."); + ParquetInputFormat.setFilterPredicate(conf, p); + return FilterCompat.get(p); + } else { + // Filter may have sensitive information. Do not send to debug. + LOG.debug("No PARQUET predicate push down is generated."); + return null; + } + } + + public List getFiltedBlocks() { + return filtedBlocks; + } + + public SerDeStats getStats() { + return serDeStats; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index 5b65e5c..f4fadbb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -14,6 +14,8 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -34,7 +36,8 @@ * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types * are not currently supported. Removing the interface turns off vectorization. */ -public class MapredParquetInputFormat extends FileInputFormat { +public class MapredParquetInputFormat extends FileInputFormat + implements VectorizedInputFormatInterface { private static final Logger LOG = LoggerFactory.getLogger(MapredParquetInputFormat.class); @@ -48,7 +51,7 @@ public MapredParquetInputFormat() { protected MapredParquetInputFormat(final ParquetInputFormat inputFormat) { this.realInput = inputFormat; - vectorizedSelf = new VectorizedParquetInputFormat(inputFormat); + vectorizedSelf = new VectorizedParquetInputFormat(); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -69,8 +72,7 @@ protected MapredParquetInputFormat(final ParquetInputFormat input if (LOG.isDebugEnabled()) { LOG.debug("Using row-mode record reader"); } - return (RecordReader) - new ParquetRecordReaderWrapper(realInput, split, job, reporter); + return new ParquetRecordReaderWrapper(realInput, split, job, reporter); } } catch (final InterruptedException e) { throw new RuntimeException("Cannot create a RecordReaderWrapper", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index 2072533..322178a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -15,147 +15,29 @@ import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.parquet.hadoop.ParquetInputFormat; /** * Vectorized input format for Parquet files */ -public class VectorizedParquetInputFormat extends FileInputFormat - implements VectorizedInputFormatInterface { - - private static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetInputFormat.class); - - /** - * Vectorized record reader for vectorized Parquet input format - */ - private static class VectorizedParquetRecordReader implements - RecordReader { - private static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class); - - private final ParquetRecordReaderWrapper internalReader; - private VectorizedRowBatchCtx rbCtx; - private Object[] partitionValues; - private ArrayWritable internalValues; - private NullWritable internalKey; - private VectorColumnAssign[] assigners; - - public VectorizedParquetRecordReader( - ParquetInputFormat realInput, - FileSplit split, - JobConf conf, Reporter reporter) throws IOException, InterruptedException { - internalReader = new ParquetRecordReaderWrapper( - realInput, - split, - conf, - reporter); - rbCtx = Utilities.getVectorizedRowBatchCtx(conf); - int partitionColumnCount = rbCtx.getPartitionColumnCount(); - if (partitionColumnCount > 0) { - partitionValues = new Object[partitionColumnCount]; - rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues); - } - } - - @Override - public NullWritable createKey() { - internalKey = internalReader.createKey(); - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - VectorizedRowBatch outputBatch; - outputBatch = rbCtx.createVectorizedRowBatch(); - internalValues = internalReader.createValue(); - return outputBatch; - } - - @Override - public long getPos() throws IOException { - return internalReader.getPos(); - } +public class VectorizedParquetInputFormat + extends FileInputFormat { - @Override - public void close() throws IOException { - internalReader.close(); - } - - @Override - public float getProgress() throws IOException { - return internalReader.getProgress(); - } - - @Override - public boolean next(NullWritable key, VectorizedRowBatch outputBatch) - throws IOException { - if (assigners != null) { - assert(outputBatch.numCols == assigners.length); - } - outputBatch.reset(); - int maxSize = outputBatch.getMaxSize(); - try { - while (outputBatch.size < maxSize) { - if (false == internalReader.next(internalKey, internalValues)) { - outputBatch.endOfFile = true; - break; - } - Writable[] writables = internalValues.get(); - - if (null == assigners) { - // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet - // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and - // the writable value (IntWritable). see Parquet's ETypeConverter class. - assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, writables); - } - - for(int i = 0; i < writables.length; ++i) { - assigners[i].assignObjectValue(writables[i], outputBatch.size); - } - ++outputBatch.size; - } - } catch (HiveException e) { - throw new RuntimeException(e); - } - return outputBatch.size > 0; - } + public VectorizedParquetInputFormat() { } - private final ParquetInputFormat realInput; - - public VectorizedParquetInputFormat(ParquetInputFormat realInput) { - this.realInput = realInput; - } - - @SuppressWarnings("unchecked") @Override public RecordReader getRecordReader( - InputSplit split, JobConf conf, Reporter reporter) throws IOException { - try { - return (RecordReader) - new VectorizedParquetRecordReader(realInput, (FileSplit) split, conf, reporter); - } catch (final InterruptedException e) { - throw new RuntimeException("Cannot create a VectorizedParquetRecordReader", e); - } + InputSplit inputSplit, + JobConf jobConf, + Reporter reporter) throws IOException { + return new VectorizedParquetRecordReader(inputSplit, jobConf); } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 8d8b0c5..0cf1be3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -70,7 +70,7 @@ * @param columns comma separated list of columns * @return list with virtual columns removed */ - private static List getColumnNames(final String columns) { + public static List getColumnNames(final String columns) { return (List) VirtualColumn. removeVirtualColumns(StringUtils.getStringCollection(columns)); } @@ -82,7 +82,7 @@ * @param types Comma separated list of types * @return A list of TypeInfo objects. */ - private static List getColumnTypes(final String types) { + public static List getColumnTypes(final String types) { return TypeInfoUtils.getTypeInfosFromTypeString(types); } @@ -177,7 +177,10 @@ private static Type getProjectedType(TypeInfo colType, Type fieldType) { * @param colTypes List of column types. * @return A MessageType object of projected columns. */ - private static MessageType getSchemaByName(MessageType schema, List colNames, List colTypes) { + public static MessageType getSchemaByName( + MessageType schema, + List colNames, + List colTypes) { List projectedFields = getProjectedGroupFields(schema, colNames, colTypes); Type[] typesArray = projectedFields.toArray(new Type[0]); @@ -195,7 +198,10 @@ private static MessageType getSchemaByName(MessageType schema, List colN * @param colIndexes List of column indexes. * @return A MessageType object of the column names found. */ - private static MessageType getSchemaByIndex(MessageType schema, List colNames, List colIndexes) { + public static MessageType getSchemaByIndex( + MessageType schema, + List colNames, + List colIndexes) { List schemaTypes = new ArrayList(); for (Integer i : colIndexes) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index d2e1b13..d1cfb0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -14,24 +14,20 @@ package org.apache.hadoop.hive.ql.io.parquet.read; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.ql.io.parquet.AbstractParquetRecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; -import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -39,25 +35,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.filter2.compat.RowGroupFilter; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetInputSplit; -import org.apache.parquet.hadoop.api.InitContext; -import org.apache.parquet.hadoop.api.ReadSupport.ReadContext; import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.FileMetaData; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ContextUtil; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; -import com.google.common.base.Strings; - -public class ParquetRecordReaderWrapper implements RecordReader, - StatsProvidingRecordReader { +public class ParquetRecordReaderWrapper extends AbstractParquetRecordReader + implements RecordReader, StatsProvidingRecordReader { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderWrapper.class); private final long splitLen; // for getPos() @@ -68,12 +52,6 @@ private ArrayWritable valueObj = null; private boolean firstRecord = false; private boolean eof = false; - private int schemaSize; - private boolean skipTimestampConversion = false; - private JobConf jobConf; - private final ProjectionPusher projectionPusher; - private List filtedBlocks; - private final SerDeStats serDeStats; public ParquetRecordReaderWrapper( final ParquetInputFormat newInputFormat, @@ -137,27 +115,6 @@ public ParquetRecordReaderWrapper( } } - public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { - SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); - if (sarg == null) { - return null; - } - - // Create the Parquet FilterPredicate without including columns that do not exist - // on the shema (such as partition columns). - FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema); - if (p != null) { - // Filter may have sensitive information. Do not send to debug. - LOG.debug("PARQUET predicate push down generated."); - ParquetInputFormat.setFilterPredicate(conf, p); - return FilterCompat.get(p); - } else { - // Filter may have sensitive information. Do not send to debug. - LOG.debug("No PARQUET predicate push down is generated."); - return null; - } - } - @Override public void close() throws IOException { if (realReader != null) { @@ -227,94 +184,4 @@ public boolean next(final NullWritable key, final ArrayWritable value) throws IO throw new IOException(e); } } - - /** - * gets a ParquetInputSplit corresponding to a split given by Hive - * - * @param oldSplit The split given by Hive - * @param conf The JobConf of the Hive job - * @return a ParquetInputSplit corresponding to the oldSplit - * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file - */ - @SuppressWarnings("deprecation") - protected ParquetInputSplit getSplit( - final InputSplit oldSplit, - final JobConf conf - ) throws IOException { - ParquetInputSplit split; - if (oldSplit instanceof FileSplit) { - final Path finalPath = ((FileSplit) oldSplit).getPath(); - jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); - - final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); - final List blocks = parquetMetadata.getBlocks(); - final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - - final ReadContext readContext = new DataWritableReadSupport().init(new InitContext(jobConf, - null, fileMetaData.getSchema())); - - // Compute stats - for (BlockMetaData bmd : blocks) { - serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); - serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); - } - - schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() - .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); - final List splitGroup = new ArrayList(); - final long splitStart = ((FileSplit) oldSplit).getStart(); - final long splitLength = ((FileSplit) oldSplit).getLength(); - for (final BlockMetaData block : blocks) { - final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { - splitGroup.add(block); - } - } - if (splitGroup.isEmpty()) { - LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); - return null; - } - - FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); - if (filter != null) { - filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); - if (filtedBlocks.isEmpty()) { - LOG.debug("All row groups are dropped due to filter predicates"); - return null; - } - - long droppedBlocks = splitGroup.size() - filtedBlocks.size(); - if (droppedBlocks > 0) { - LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); - } - } else { - filtedBlocks = splitGroup; - } - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); - } - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - ((FileSplit) oldSplit).getLocations(), - filtedBlocks, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); - return split; - } else { - throw new IllegalArgumentException("Unknown split type: " + oldSplit); - } - } - - public List getFiltedBlocks() { - return filtedBlocks; - } - - @Override - public SerDeStats getStats() { - return serDeStats; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java index aace48e..3fd75d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java @@ -49,7 +49,7 @@ private static Calendar getLocalCalendar() { return parquetLocalCalendar.get(); } - private static Calendar getCalendar(boolean skipConversion) { + public static Calendar getCalendar(boolean skipConversion) { Calendar calendar = skipConversion ? getLocalCalendar() : getGMTCalendar(); calendar.clear(); // Reset all fields before reusing this instance return calendar; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java new file mode 100644 index 0000000..6fa6213 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java @@ -0,0 +1,605 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Timestamp; +import java.util.Arrays; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * It's column level Parquet reader which is used to read a batch of records for a column, + * partial of the code is referred from Apache Spark and Apache Parquet. + */ +public class VectorizedColumnReader { + + private static final Logger LOG = LoggerFactory.getLogger(VectorizedColumnReader.class); + + private boolean skipTimestampConversion = false; + + /** + * Total number of values read. + */ + private long valuesRead; + + /** + * value that indicates the end of the current page. That is, + * if valuesRead == endOfPageValueCount, we are at the end of the page. + */ + private long endOfPageValueCount; + + /** + * The dictionary, if this column has dictionary encoding. + */ + private final Dictionary dictionary; + + /** + * If true, the current page is dictionary encoded. + */ + private boolean isCurrentPageDictionaryEncoded; + + /** + * Maximum definition level for this column. + */ + private final int maxDefLevel; + + private int definitionLevel; + private int repetitionLevel; + + /** + * Repetition/Definition/Value readers. + */ + private IntIterator repetitionLevelColumn; + private IntIterator definitionLevelColumn; + private ValuesReader dataColumn; + + /** + * Total values in the current page. + */ + private int pageValueCount; + + private final PageReader pageReader; + private final ColumnDescriptor descriptor; + private final Type type; + + public VectorizedColumnReader( + ColumnDescriptor descriptor, + PageReader pageReader, + boolean skipTimestampConversion, + Type type) throws IOException { + this.descriptor = descriptor; + this.type = type; + this.pageReader = pageReader; + this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + this.skipTimestampConversion = skipTimestampConversion; + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + try { + this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); + this.isCurrentPageDictionaryEncoded = true; + } catch (IOException e) { + throw new IOException("could not decode the dictionary for " + descriptor, e); + } + } else { + this.dictionary = null; + this.isCurrentPageDictionaryEncoded = false; + } + } + + void readBatch( + int total, + ColumnVector column, + TypeInfo columnType) throws IOException { + + int rowId = 0; + while (total > 0) { + // Compute the number of values we want to read in this page. + consume(); + int leftInPage = (int) (endOfPageValueCount - valuesRead); + int num = Math.min(total, leftInPage); + if (isCurrentPageDictionaryEncoded) { + LongColumnVector dictionaryIds = new LongColumnVector(); + // Read and decode dictionary ids. + readDictionaryIDs(num, dictionaryIds, rowId); + decodeDictionaryIds(rowId, num, column, dictionaryIds); + } else { + // assign values in vector + PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + switch (primitiveColumnType.getPrimitiveCategory()) { + case INT: + case BYTE: + case SHORT: + readIntegers(num, (LongColumnVector) column, rowId); + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + readLongs(num, (LongColumnVector) column, rowId); + break; + case BOOLEAN: + readBooleans(num, (LongColumnVector) column, rowId); + break; + case DOUBLE: + readDoubles(num, (DoubleColumnVector) column, rowId); + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + readBinaries(num, (BytesColumnVector) column, rowId); + break; + case FLOAT: + readFloats(num, (DoubleColumnVector) column, rowId); + break; + case DECIMAL: + readDecimal(num, (DecimalColumnVector) column, rowId); + case INTERVAL_DAY_TIME: + case TIMESTAMP: + break; + default: + throw new IOException("Unsupported"); + } + } + rowId += num; + total -= num; + } + } + + private void consume() throws IOException { + int leftInPage = (int) (endOfPageValueCount - valuesRead); + if (leftInPage == 0) { + readPage(); + } + } + + private void readDictionaryIDs( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readValueDictionaryId(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readIntegers( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readInteger(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readDoubles( + int total, + DoubleColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readDouble(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readBooleans( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0; + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readLongs( + int total, + LongColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readLong(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readFloats( + int total, + DoubleColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = dataColumn.readFloat(); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readDecimal( + int total, + DecimalColumnVector c, + int rowId) throws IOException { + int left = total; + c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); + c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = new HiveDecimalWritable(dataColumn.readBytes().getBytesUnsafe(), + type.asPrimitiveType().getDecimalMetadata().getScale()); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + private void readBinaries( + int total, + BytesColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + consume(); + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + Binary binary = dataColumn.readBytes(); + c.setVal(rowId, binary.getBytesUnsafe()); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && Arrays.equals(binary.getBytesUnsafe(), + ArrayUtils.subarray(c.vector[0], c.start[0], c.length[0])); + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + + /** + * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. + */ + private void decodeDictionaryIds(int rowId, int num, ColumnVector column, + LongColumnVector dictionaryIds) { + System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num); + if (column.noNulls) { + column.noNulls = dictionaryIds.noNulls ? true : false; + } + column.isRepeating = column.isRepeating && dictionaryIds.isRepeating; + + switch (descriptor.getType()) { + case INT32: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToInt((int) dictionaryIds.vector[i]); + } + break; + case INT64: + for (int i = rowId; i < rowId + num; ++i) { + ((LongColumnVector) column).vector[i] = + dictionary.decodeToLong((int) dictionaryIds.vector[i]); + } + break; + case FLOAT: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToFloat((int) dictionaryIds.vector[i]); + } + break; + case DOUBLE: + for (int i = rowId; i < rowId + num; ++i) { + ((DoubleColumnVector) column).vector[i] = + dictionary.decodeToDouble((int) dictionaryIds.vector[i]); + } + break; + case INT96: + for (int i = rowId; i < rowId + num; ++i) { + ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer(); + buf.order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = buf.getLong(); + int julianDay = buf.getInt(); + NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); + Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + ((TimestampColumnVector) column).set(i, ts); + } + break; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + for (int i = rowId; i < rowId + num; ++i) { + ((BytesColumnVector) column) + .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); + } + break; + default: + throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); + } + } + + private void readRepetitionAndDefinitionLevels() { + repetitionLevel = repetitionLevelColumn.nextInt(); + definitionLevel = definitionLevelColumn.nextInt(); + valuesRead++; + } + + private void readPage() throws IOException { + DataPage page = pageReader.readPage(); + // TODO: Why is this a visitor? + page.accept(new DataPage.Visitor() { + @Override + public Void visit(DataPageV1 dataPageV1) { + readPageV1(dataPageV1); + return null; + } + + @Override + public Void visit(DataPageV2 dataPageV2) { + readPageV2(dataPageV2); + return null; + } + }); + } + + private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { + this.pageValueCount = valueCount; + this.endOfPageValueCount = valuesRead + pageValueCount; + if (dataEncoding.usesDictionary()) { + this.dataColumn = null; + if (dictionary == null) { + throw new IOException( + "could not read page in col " + descriptor + + " as the dictionary was missing for encoding " + dataEncoding); + } + dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); + this.isCurrentPageDictionaryEncoded = true; + } else { + if (dataEncoding != Encoding.PLAIN) { + throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); + } + dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); + this.isCurrentPageDictionaryEncoded = false; + } + + try { + dataColumn.initFromPage(pageValueCount, bytes, offset); + } catch (IOException e) { + throw new IOException("could not read page in col " + descriptor, e); + } + } + + private void readPageV1(DataPageV1 page) { + ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); + ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); + this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); + this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); + try { + byte[] bytes = page.getBytes().toByteArray(); + LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); + LOG.debug("reading repetition levels at 0"); + rlReader.initFromPage(pageValueCount, bytes, 0); + int next = rlReader.getNextOffset(); + LOG.debug("reading definition levels at " + next); + dlReader.initFromPage(pageValueCount, bytes, next); + next = dlReader.getNextOffset(); + LOG.debug("reading data at " + next); + initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private void readPageV2(DataPageV2 page) { + this.pageValueCount = page.getValueCount(); + this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), + page.getRepetitionLevels()); + this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); + try { + LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); + initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { + try { + if (maxLevel == 0) { + return new NullIntIterator(); + } + return new RLEIntIterator( + new RunLengthBitPackingHybridDecoder( + BytesUtils.getWidthFromMaxInt(maxLevel), + new ByteArrayInputStream(bytes.toByteArray()))); + } catch (IOException e) { + throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e); + } + } + + /** + * Creates a reader for definition and repetition levels, returning an optimized one if + * the levels are not needed. + */ + protected static IntIterator createRLEIterator( + int maxLevel, + BytesInput bytes, + ColumnDescriptor descriptor) throws IOException { + try { + if (maxLevel == 0) { + return new NullIntIterator(); + } + return new RLEIntIterator( + new RunLengthBitPackingHybridDecoder(BytesUtils.getWidthFromMaxInt(maxLevel), + new ByteArrayInputStream(bytes.toByteArray()))); + } catch (IOException e) { + throw new IOException("could not read levels in page for col " + descriptor, e); + } + } + + + /** + * Utility classes to abstract over different way to read ints with different encodings. + * TODO: remove this layer of abstraction? + */ + abstract static class IntIterator { + abstract int nextInt(); + } + + protected static final class ValuesReaderIntIterator extends IntIterator { + ValuesReader delegate; + + public ValuesReaderIntIterator(ValuesReader delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + return delegate.readInteger(); + } + } + + protected static final class RLEIntIterator extends IntIterator { + RunLengthBitPackingHybridDecoder delegate; + + public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + try { + return delegate.readInt(); + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + } + + protected static final class NullIntIterator extends IntIterator { + @Override + int nextInt() { return 0; } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java new file mode 100644 index 0000000..bc3fcfb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -0,0 +1,346 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.AbstractParquetRecordReader; +import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; + +/** + * This reader is used to read a batch of record from inputsplit, partial of the code is referred + * from Apache Spark and Apache Parquet. + */ +public class VectorizedParquetRecordReader extends AbstractParquetRecordReader + implements RecordReader { + public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class); + + private List colsToInclude; + + protected MessageType fileSchema; + protected MessageType requestedSchema; + List columnNamesList; + List columnTypesList; + + private VectorizedRowBatchCtx rbCtx; + + /** + * For each request column, the reader to read this column. This is NULL if this column + * is missing from the file, in which case we populate the attribute with NULL. + */ + private VectorizedColumnReader[] columnReaders; + + /** + * The number of rows that have been returned. + */ + private long rowsReturned; + + /** + * The number of rows that have been reading, including the current in flight row group. + */ + private long totalCountLoadedSoFar = 0; + + /** + * The total number of rows this RecordReader will eventually read. The sum of the + * rows of all the row groups. + */ + protected long totalRowCount; + + public VectorizedParquetRecordReader( + org.apache.hadoop.mapred.InputSplit oldInputSplit, + JobConf conf) { + try { + serDeStats = new SerDeStats(); + projectionPusher = new ProjectionPusher(); + initialize(oldInputSplit, conf); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + } catch (Throwable e) { + LOG.error("Failed to create the vectorized reader due to exception " + e); + throw new RuntimeException(e); + } + } + + public VectorizedParquetRecordReader( + InputSplit inputSplit, + JobConf conf) { + try { + serDeStats = new SerDeStats(); + projectionPusher = new ProjectionPusher(); + initialize(inputSplit, conf); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); + rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(createStructObjectInspector(), new String[0]); + } catch (Throwable e) { + LOG.error("Failed to create the vectorized reader due to exception " + e); + throw new RuntimeException(e); + } + } + + private StructObjectInspector createStructObjectInspector() { + // Create row related objects + TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + } + + + public void initialize( + org.apache.hadoop.mapred.InputSplit oldInputSplit, + JobConf configuration) throws IOException, InterruptedException { + initialize(getSplit(oldInputSplit, configuration), configuration); + } + + public void initialize( + InputSplit oldSplit, + JobConf configuration) throws IOException, InterruptedException { + jobConf = configuration; + ParquetMetadata footer; + List blocks; + ParquetInputSplit split = (ParquetInputSplit) oldSplit; + boolean indexAccess = + configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false); + this.file = split.getPath(); + long[] rowGroupOffsets = split.getRowGroupOffsets(); + + String columnNames = configuration.get(IOConstants.COLUMNS); + columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); + columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + + // if task.side.metadata is set, rowGroupOffsets is null + if (rowGroupOffsets == null) { + // then we need to apply the predicate push down filter + footer = readFooter(configuration, file, range(split.getStart(), split.getEnd())); + MessageType fileSchema = footer.getFileMetaData().getSchema(); + FilterCompat.Filter filter = getFilter(configuration); + blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); + } else { + // otherwise we find the row groups that were selected on the client + footer = readFooter(configuration, file, NO_FILTER); + Set offsets = new HashSet<>(); + for (long offset : rowGroupOffsets) { + offsets.add(offset); + } + blocks = new ArrayList<>(); + for (BlockMetaData block : footer.getBlocks()) { + if (offsets.contains(block.getStartingPos())) { + blocks.add(block); + } + } + // verify we found them all + if (blocks.size() != rowGroupOffsets.length) { + long[] foundRowGroupOffsets = new long[footer.getBlocks().size()]; + for (int i = 0; i < foundRowGroupOffsets.length; i++) { + foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos(); + } + // this should never happen. + // provide a good error message in case there's a bug + throw new IllegalStateException( + "All the offsets listed in the split should be found in the file." + + " expected: " + Arrays.toString(rowGroupOffsets) + + " found: " + blocks + + " out of: " + Arrays.toString(foundRowGroupOffsets) + + " in range " + split.getStart() + ", " + split.getEnd()); + } + } + + for (BlockMetaData block : blocks) { + this.totalRowCount += block.getRowCount(); + } + this.fileSchema = footer.getFileMetaData().getSchema(); + + + MessageType tableSchema; + if (indexAccess) { + List indexSequence = new ArrayList(); + + // Generates a sequence list of indexes + for(int i = 0; i < columnNamesList.size(); i++) { + indexSequence.add(i); + } + + tableSchema = DataWritableReadSupport.getSchemaByIndex(fileSchema, columnNamesList, + indexSequence); + } else { + tableSchema = DataWritableReadSupport.getSchemaByName(fileSchema, columnNamesList, + columnTypesList); + } +// this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + + List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { + requestedSchema = DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, + indexColumnsWanted); + }else{ + requestedSchema = fileSchema; + } + + this.reader = new ParquetFileReader( + configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); + } + + /** + * columnBatch object that is used for batch decoding. This is created on first use and triggers + * batched decoding. It is not valid to interleave calls to the batched interface with the row + * by row RecordReader APIs. + * This is only enabled with additional flags for development. This is still a work in progress + * and currently unsupported cases will fail with potentially difficult to diagnose errors. + * This should be only turned on for development to work on this feature. + * + * When this is set, the code will branch early on in the RecordReader APIs. There is no shared + * code between the path that uses the MR decoders and the vectorized ones. + * + * TODOs: + * - Implement v2 page formats (just make sure we create the correct decoders). + */ + private VectorizedRowBatch columnarBatch; + + @Override + public boolean next( + NullWritable nullWritable, + VectorizedRowBatch vectorizedRowBatch) throws IOException { + columnarBatch = vectorizedRowBatch; + return nextBatch(); + } + + /** + * Advances to the next batch of rows. Returns false if there are no more. + */ + private boolean nextBatch() throws IOException { + initRowBatch(); + columnarBatch.reset(); + if (rowsReturned >= totalRowCount) { + return false; + } + checkEndOfRowGroup(); + + int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, totalCountLoadedSoFar - rowsReturned); + for (int i = 0; i < columnReaders.length; ++i) { + if (columnReaders[i] == null) { + continue; + } + columnarBatch.cols[colsToInclude.get(i)].isRepeating = true; + columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)], + columnTypesList.get(colsToInclude.get(i))); + } + rowsReturned += num; + columnarBatch.size = num; + return true; + } + + private void initRowBatch() { + if (columnarBatch == null) { + if (rbCtx != null) { + columnarBatch = rbCtx.createVectorizedRowBatch(); + } else { + // test only + rbCtx = new VectorizedRowBatchCtx(); + try { + rbCtx.init(createStructObjectInspector(), new String[0]); + } catch (HiveException e) { + e.printStackTrace(); + } + } + } + } + + private void checkEndOfRowGroup() throws IOException { + if (rowsReturned != totalCountLoadedSoFar) { + return; + } + PageReadStore pages = reader.readNextRowGroup(); + if (pages == null) { + throw new IOException("expecting more rows but reached last block. Read " + + rowsReturned + " out of " + totalRowCount); + } + List columns = requestedSchema.getColumns(); + List types = requestedSchema.getFields(); + columnReaders = new VectorizedColumnReader[columns.size()]; + for (int i = 0; i < columns.size(); ++i) { + columnReaders[i] = + new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)), + skipTimestampConversion, types.get(i)); + } + totalCountLoadedSoFar += pages.getRowCount(); + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + initRowBatch(); + return columnarBatch; + } + + @Override + public long getPos() throws IOException { + //TODO + return 0; + } + + @Override + public void close() throws IOException { + if (columnarBatch != null) { + columnarBatch = null; + } + } + + @Override + public float getProgress() throws IOException { + //TODO + return 0; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java new file mode 100644 index 0000000..6d95780 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -0,0 +1,382 @@ +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertFalse; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; + +public class TestVectorizedColumnReader { + + private static final int nElements = 2500; + protected static final Configuration conf = new Configuration(); + protected static final Path file = + new Path("target/test/TestParquetVectorReader/testParquetFile"); + private static String[] uniqueStrs = new String[nElements]; + private static boolean[] isNulls = new boolean[nElements]; + private static Random random = new Random(); + protected static final MessageType schema = parseMessageType( + "message test { " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required int96 int96_field; " + + "required double double_field; " + + "required float float_field; " + + "required boolean boolean_field; " + + "required fixed_len_byte_array(3) flba_field; " + + "optional fixed_len_byte_array(1) some_null_field; " + + "optional fixed_len_byte_array(1) all_null_field; " + + "optional binary binary_field; " + + "optional binary binary_field_non_repeating; " + + "} "); + + @AfterClass + public static void cleanup() throws IOException { + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + fs.delete(file, true); + } + } + + @BeforeClass + public static void prepareFile() throws IOException { + cleanup(); + + boolean dictionaryEnabled = true; + boolean validating = false; + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter writer = new ParquetWriter( + file, + new GroupWriteSupport(), + GZIP, 1024*1024, 1024, 1024*1024, + dictionaryEnabled, validating, PARQUET_1_0, conf); + writeData(f, writer); + } + + protected static void writeData(SimpleGroupFactory f, ParquetWriter writer) throws IOException { + initialStrings(uniqueStrs); + for (int i = 0; i < nElements; i++) { + Group group = f.newGroup() + .append("int32_field", i) + .append("int64_field", (long) 2 * i) + .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes())) + .append("double_field", i * 1.0) + .append("float_field", ((float) (i * 2.0))) + .append("boolean_field", i % 5 == 0) + .append("flba_field", "abc"); + + if (i % 2 == 1) { + group.append("some_null_field", "x"); + } + + if (i % 13 != 1) { + int binaryLen = i % 10; + group.append("binary_field", + Binary.fromString(new String(new char[binaryLen]).replace("\0", "x"))); + } + + if (uniqueStrs[i] != null) { + group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i])); + } + writer.write(group); + } + writer.close(); + } + + private static String getRandomStr() { + int len = random.nextInt(10); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < len; i++) { + sb.append((char) ('a' + random.nextInt(25))); + } + return sb.toString(); + } + + public static void initialStrings(String[] uniqueStrs) { + int nulls = 0; + for (int i = 0; i < uniqueStrs.length; i++) { + String str = getRandomStr(); + if (!str.isEmpty()) { + uniqueStrs[i] = str; + isNulls[i] = false; + }else{ + isNulls[i] = true; + } + } + } + + private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) + throws IOException, InterruptedException { + conf.set(PARQUET_READ_SCHEMA, schemaString); + Job vectorJob = new Job(conf, "read vector"); + ParquetInputFormat.setInputPaths(vectorJob, file); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); + InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + return new VectorizedParquetRecordReader(split, new JobConf(conf)); + } + + @Test + public void testIntRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int32_field"); + conf.set(IOConstants.COLUMNS_TYPES,"int"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int32 int32_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(c, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testLongRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int64_field"); + conf.set(IOConstants.COLUMNS_TYPES, "bigint"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int64 int64_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(2 * c, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testDoubleRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"double_field"); + conf.set(IOConstants.COLUMNS_TYPES, "double"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required double double_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(1.0 * c, vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testFloatRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"float_field"); + conf.set(IOConstants.COLUMNS_TYPES, "float"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required float float_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals((float)2.0 * c, vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBooleanRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"boolean_field"); + conf.set(IOConstants.COLUMNS_TYPES, "boolean"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required boolean boolean_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + int e = (c % 5 == 0) ? 1 : 0; + assertEquals(e, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBinaryReadDictionaryEncoding() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"binary_field"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + if (c % 13 == 1) { + assertTrue(vector.isNull[i]); + } else { + assertFalse(vector.isNull[i]); + int binaryLen = c % 10; + String expected = new String(new char[binaryLen]).replace("\0", "x"); + String actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("Failed at " + c, expected, actual); + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBinaryRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"binary_field_non_repeating"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field_non_repeating;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + String actual; + assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]); + if (!vector.isNull[i]) { + actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("failed at " + c, uniqueStrs[c], actual); + }else{ + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + + } +} diff --git ql/src/test/queries/clientpositive/parquet_types_vectorization.q ql/src/test/queries/clientpositive/parquet_types_vectorization.q new file mode 100644 index 0000000..d877b00 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_types_vectorization.q @@ -0,0 +1,84 @@ +set hive.mapred.mode=nonstrict; +DROP TABLE parquet_types_staging; +DROP TABLE parquet_types; + +set hive.vectorized.execution.enabled=true; +set hive.vectorized.execution.reduce.enabled=true; +set hive.vectorized.use.row.serde.deserialize=true; +set hive.vectorized.use.vector.serde.deserialize=true; +set hive.vectorized.execution.reduce.groupby.enabled = true; + +CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':'; + +CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET; + +LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging; + +SELECT * FROM parquet_types_staging; + +INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging; + +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +hex(cbinary), m1, l1, st1, d FROM parquet_types; + +SELECT cchar, LENGTH(cchar), cvarchar, LENGTH(cvarchar) FROM parquet_types; + +-- test types in group by + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +; + +SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat; + +SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar; + +SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar; + +SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; + +SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t; + +SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; \ No newline at end of file diff --git ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out index 8345132..e42453d 100644 --- ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out +++ ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out @@ -150,7 +150,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: tinyint) Statistics: Num rows: 12288 Data size: 73728 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: smallint), _col3 (type: bigint), _col4 (type: struct), _col5 (type: struct) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Reducer 2 Execution mode: llap diff --git ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out index b49d5dd..0524cb3 100644 --- ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out +++ ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out @@ -250,19 +250,19 @@ Stage-0 limit:-1 Stage-1 Reducer 3 vectorized, llap - File Output Operator [FS_10] - Select Operator [SEL_9] (rows=11 width=11) + File Output Operator [FS_12] + Select Operator [SEL_11] (rows=11 width=11) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"] <-Reducer 2 [SIMPLE_EDGE] llap SHUFFLE [RS_6] Group By Operator [GBY_4] (rows=11 width=11) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(VALUE._col0)","min(VALUE._col1)","count(VALUE._col2)","avg(VALUE._col3)","stddev_pop(VALUE._col4)","max(VALUE._col5)"],keys:KEY._col0 - <-Map 1 [SIMPLE_EDGE] llap + <-Map 1 [SIMPLE_EDGE] vectorized, llap SHUFFLE [RS_3] PartitionCols:_col0 - Group By Operator [GBY_2] (rows=22 width=11) + Group By Operator [GBY_10] (rows=22 width=11) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(cint)","min(csmallint)","count(cstring1)","avg(cfloat)","stddev_pop(cdouble)","max(cdecimal)"],keys:ctinyint - Select Operator [SEL_1] (rows=22 width=11) + Select Operator [SEL_9] (rows=22 width=11) Output:["ctinyint","cint","csmallint","cstring1","cfloat","cdouble","cdecimal"] TableScan [TS_0] (rows=22 width=11) default@parquet_types,parquet_types,Tbl:COMPLETE,Col:NONE,Output:["cint","ctinyint","csmallint","cfloat","cdouble","cstring1","cdecimal"] diff --git ql/src/test/results/clientpositive/parquet_types_vectorization.q.out ql/src/test/results/clientpositive/parquet_types_vectorization.q.out new file mode 100644 index 0000000..ad743ef --- /dev/null +++ ql/src/test/results/clientpositive/parquet_types_vectorization.q.out @@ -0,0 +1,379 @@ +PREHOOK: query: DROP TABLE parquet_types_staging +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_types_staging +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE parquet_types +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parquet_types +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_types_staging +POSTHOOK: query: CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_types_staging +PREHOOK: query: CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_types +POSTHOOK: query: CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_types +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@parquet_types_staging +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@parquet_types_staging +PREHOOK: query: SELECT * FROM parquet_types_staging +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types_staging +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parquet_types_staging +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types_staging +#### A masked pattern was here #### +100 1 1 1.0 0.0 abc 2011-01-01 01:01:01.111111111 a a B4F3CAFDBEDD {"k1":"v1"} [101,200] {"c1":10,"c2":"a"} 2011-01-01 +101 2 2 1.1 0.3 def 2012-02-02 02:02:02.222222222 ab ab 68692CCAC0BDE7 {"k2":"v2"} [102,200] {"c1":10,"c2":"d"} 2012-02-02 +102 3 3 1.2 0.6 ghi 2013-03-03 03:03:03.333333333 abc abc B4F3CAFDBEDD {"k3":"v3"} [103,200] {"c1":10,"c2":"g"} 2013-03-03 +103 1 4 1.3 0.9 jkl 2014-04-04 04:04:04.444444444 abcd abcd 68692CCAC0BDE7 {"k4":"v4"} [104,200] {"c1":10,"c2":"j"} 2014-04-04 +104 2 5 1.4 1.2 mno 2015-05-05 05:05:05.555555555 abcde abcde B4F3CAFDBEDD {"k5":"v5"} [105,200] {"c1":10,"c2":"m"} 2015-05-05 +105 3 1 1.0 1.5 pqr 2016-06-06 06:06:06.666666666 abcde abcdef 68692CCAC0BDE7 {"k6":"v6"} [106,200] {"c1":10,"c2":"p"} 2016-06-06 +106 1 2 1.1 1.8 stu 2017-07-07 07:07:07.777777777 abcde abcdefg B4F3CAFDBEDD {"k7":"v7"} [107,200] {"c1":10,"c2":"s"} 2017-07-07 +107 2 3 1.2 2.1 vwx 2018-08-08 08:08:08.888888888 bcdef abcdefgh 68692CCAC0BDE7 {"k8":"v8"} [108,200] {"c1":10,"c2":"v"} 2018-08-08 +108 3 4 1.3 2.4 yza 2019-09-09 09:09:09.999999999 cdefg B4F3CAFDBE 68656C6C6F {"k9":"v9"} [109,200] {"c1":10,"c2":"y"} 2019-09-09 +109 1 5 1.4 2.7 bcd 2020-10-10 10:10:10.101010101 klmno abcdedef 68692CCAC0BDE7 {"k10":"v10"} [110,200] {"c1":10,"c2":"b"} 2020-10-10 +110 2 1 1.0 3.0 efg 2021-11-11 11:11:11.111111111 pqrst abcdede B4F3CAFDBEDD {"k11":"v11"} [111,200] {"c1":10,"c2":"e"} 2021-11-11 +111 3 2 1.1 3.3 hij 2022-12-12 12:12:12.121212121 nopqr abcded 68692CCAC0BDE7 {"k12":"v12"} [112,200] {"c1":10,"c2":"h"} 2022-12-12 +112 1 3 1.2 3.6 klm 2023-01-02 13:13:13.131313131 opqrs abcdd B4F3CAFDBEDD {"k13":"v13"} [113,200] {"c1":10,"c2":"k"} 2023-01-02 +113 2 4 1.3 3.9 nop 2024-02-02 14:14:14.141414141 pqrst abc 68692CCAC0BDE7 {"k14":"v14"} [114,200] {"c1":10,"c2":"n"} 2024-02-02 +114 3 5 1.4 4.2 qrs 2025-03-03 15:15:15.151515151 qrstu b B4F3CAFDBEDD {"k15":"v15"} [115,200] {"c1":10,"c2":"q"} 2025-03-03 +115 1 1 1.0 4.5 qrs 2026-04-04 16:16:16.161616161 rstuv abcded 68692CCAC0BDE7 {"k16":"v16"} [116,200] {"c1":10,"c2":"q"} 2026-04-04 +116 2 2 1.1 4.8 wxy 2027-05-05 17:17:17.171717171 stuvw abcded B4F3CAFDBEDD {"k17":"v17"} [117,200] {"c1":10,"c2":"w"} 2027-05-05 +117 3 3 1.2 5.1 zab 2028-06-06 18:18:18.181818181 tuvwx abcded 68692CCAC0BDE7 {"k18":"v18"} [118,200] {"c1":10,"c2":"z"} 2028-06-06 +118 1 4 1.3 5.4 cde 2029-07-07 19:19:19.191919191 uvwzy abcdede B4F3CAFDBEDD {"k19":"v19"} [119,200] {"c1":10,"c2":"c"} 2029-07-07 +119 2 5 1.4 5.7 fgh 2030-08-08 20:20:20.202020202 vwxyz abcdede 68692CCAC0BDE7 {"k20":"v20"} [120,200] {"c1":10,"c2":"f"} 2030-08-08 +120 3 1 1.0 6.0 ijk 2031-09-09 21:21:21.212121212 wxyza abcde B4F3CAFDBEDD {"k21":"v21"} [121,200] {"c1":10,"c2":"i"} 2031-09-09 +121 1 2 1.1 6.3 lmn 2032-10-10 22:22:22.222222222 bcdef abcde {"k22":"v22"} [122,200] {"c1":10,"c2":"l"} 2032-10-10 +PREHOOK: query: INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types_staging +PREHOOK: Output: default@parquet_types +POSTHOOK: query: INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types_staging +POSTHOOK: Output: default@parquet_types +POSTHOOK: Lineage: parquet_types.cbinary EXPRESSION [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cbinary, type:string, comment:null), ] +POSTHOOK: Lineage: parquet_types.cchar SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cchar, type:char(5), comment:null), ] +POSTHOOK: Lineage: parquet_types.cdouble SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: parquet_types.cfloat SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: parquet_types.cint SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_types.csmallint SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: parquet_types.cstring1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: parquet_types.ctinyint SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] +POSTHOOK: Lineage: parquet_types.cvarchar SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:cvarchar, type:varchar(10), comment:null), ] +POSTHOOK: Lineage: parquet_types.d SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:d, type:date, comment:null), ] +POSTHOOK: Lineage: parquet_types.l1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:l1, type:array, comment:null), ] +POSTHOOK: Lineage: parquet_types.m1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:m1, type:map, comment:null), ] +POSTHOOK: Lineage: parquet_types.st1 SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:st1, type:struct, comment:null), ] +POSTHOOK: Lineage: parquet_types.t SIMPLE [(parquet_types_staging)parquet_types_staging.FieldSchema(name:t, type:timestamp, comment:null), ] +PREHOOK: query: SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +hex(cbinary), m1, l1, st1, d FROM parquet_types +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +hex(cbinary), m1, l1, st1, d FROM parquet_types +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +100 1 1 1.0 0.0 abc 2011-01-01 01:01:01.111111111 a a B4F3CAFDBEDD {"k1":"v1"} [101,200] {"c1":10,"c2":"a"} 2011-01-01 +101 2 2 1.1 0.3 def 2012-02-02 02:02:02.222222222 ab ab 68692CCAC0BDE7 {"k2":"v2"} [102,200] {"c1":10,"c2":"d"} 2012-02-02 +102 3 3 1.2 0.6 ghi 2013-03-03 03:03:03.333333333 abc abc B4F3CAFDBEDD {"k3":"v3"} [103,200] {"c1":10,"c2":"g"} 2013-03-03 +103 1 4 1.3 0.9 jkl 2014-04-04 04:04:04.444444444 abcd abcd 68692CCAC0BDE7 {"k4":"v4"} [104,200] {"c1":10,"c2":"j"} 2014-04-04 +104 2 5 1.4 1.2 mno 2015-05-05 05:05:05.555555555 abcde abcde B4F3CAFDBEDD {"k5":"v5"} [105,200] {"c1":10,"c2":"m"} 2015-05-05 +105 3 1 1.0 1.5 pqr 2016-06-06 06:06:06.666666666 abcde abcdef 68692CCAC0BDE7 {"k6":"v6"} [106,200] {"c1":10,"c2":"p"} 2016-06-06 +106 1 2 1.1 1.8 stu 2017-07-07 07:07:07.777777777 abcde abcdefg B4F3CAFDBEDD {"k7":"v7"} [107,200] {"c1":10,"c2":"s"} 2017-07-07 +107 2 3 1.2 2.1 vwx 2018-08-08 08:08:08.888888888 bcdef abcdefgh 68692CCAC0BDE7 {"k8":"v8"} [108,200] {"c1":10,"c2":"v"} 2018-08-08 +108 3 4 1.3 2.4 yza 2019-09-09 09:09:09.999999999 cdefg B4F3CAFDBE 68656C6C6F {"k9":"v9"} [109,200] {"c1":10,"c2":"y"} 2019-09-09 +109 1 5 1.4 2.7 bcd 2020-10-10 10:10:10.101010101 klmno abcdedef 68692CCAC0BDE7 {"k10":"v10"} [110,200] {"c1":10,"c2":"b"} 2020-10-10 +110 2 1 1.0 3.0 efg 2021-11-11 11:11:11.111111111 pqrst abcdede B4F3CAFDBEDD {"k11":"v11"} [111,200] {"c1":10,"c2":"e"} 2021-11-11 +111 3 2 1.1 3.3 hij 2022-12-12 12:12:12.121212121 nopqr abcded 68692CCAC0BDE7 {"k12":"v12"} [112,200] {"c1":10,"c2":"h"} 2022-12-12 +112 1 3 1.2 3.6 klm 2023-01-02 13:13:13.131313131 opqrs abcdd B4F3CAFDBEDD {"k13":"v13"} [113,200] {"c1":10,"c2":"k"} 2023-01-02 +113 2 4 1.3 3.9 nop 2024-02-02 14:14:14.141414141 pqrst abc 68692CCAC0BDE7 {"k14":"v14"} [114,200] {"c1":10,"c2":"n"} 2024-02-02 +114 3 5 1.4 4.2 qrs 2025-03-03 15:15:15.151515151 qrstu b B4F3CAFDBEDD {"k15":"v15"} [115,200] {"c1":10,"c2":"q"} 2025-03-03 +115 1 1 1.0 4.5 qrs 2026-04-04 16:16:16.161616161 rstuv abcded 68692CCAC0BDE7 {"k16":"v16"} [116,200] {"c1":10,"c2":"q"} 2026-04-04 +116 2 2 1.1 4.8 wxy 2027-05-05 17:17:17.171717171 stuvw abcded B4F3CAFDBEDD {"k17":"v17"} [117,200] {"c1":10,"c2":"w"} 2027-05-05 +117 3 3 1.2 5.1 zab 2028-06-06 18:18:18.181818181 tuvwx abcded 68692CCAC0BDE7 {"k18":"v18"} [118,200] {"c1":10,"c2":"z"} 2028-06-06 +118 1 4 1.3 5.4 cde 2029-07-07 19:19:19.191919191 uvwzy abcdede B4F3CAFDBEDD {"k19":"v19"} [119,200] {"c1":10,"c2":"c"} 2029-07-07 +119 2 5 1.4 5.7 fgh 2030-08-08 20:20:20.202020202 vwxyz abcdede 68692CCAC0BDE7 {"k20":"v20"} [120,200] {"c1":10,"c2":"f"} 2030-08-08 +120 3 1 1.0 6.0 ijk 2031-09-09 21:21:21.212121212 wxyza abcde B4F3CAFDBEDD {"k21":"v21"} [121,200] {"c1":10,"c2":"i"} 2031-09-09 +121 1 2 1.1 6.3 lmn 2032-10-10 22:22:22.222222222 bcdef abcde {"k22":"v22"} [122,200] {"c1":10,"c2":"l"} 2032-10-10 +PREHOOK: query: SELECT cchar, LENGTH(cchar), cvarchar, LENGTH(cvarchar) FROM parquet_types +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cchar, LENGTH(cchar), cvarchar, LENGTH(cvarchar) FROM parquet_types +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +a 1 a 3 +ab 2 ab 3 +abc 3 abc 3 +abcd 4 abcd 4 +abcde 5 abcde 5 +abcde 5 abcdef 6 +abcde 5 abcdefg 7 +bcdef 5 abcdefgh 8 +cdefg 5 B4F3CAFDBE 10 +klmno 5 abcdedef 8 +pqrst 5 abcdede 7 +nopqr 5 abcded 6 +opqrs 5 abcdd 5 +pqrst 5 abc 3 +qrstu 5 b 1 +rstuv 5 abcded 6 +stuvw 5 abcded 6 +tuvwx 5 abcded 6 +uvwzy 5 abcdede 7 +vwxyz 5 abcdede 7 +wxyza 5 abcde 5 +bcdef 5 abcde 5 +PREHOOK: query: -- test types in group by + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: -- test types in group by + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +1 121 1 8 1.175 2.06216 +2 119 1 7 1.21429 1.8 +3 120 1 7 1.17143 1.8 +PREHOOK: query: SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +1.0 5 +1.1 5 +1.2 4 +1.3 4 +1.4 4 +PREHOOK: query: SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +a 1 +ab 1 +abc 1 +abcd 1 +abcde 3 +bcdef 2 +cdefg 1 +klmno 1 +nopqr 1 +opqrs 1 +pqrst 2 +qrstu 1 +rstuv 1 +stuvw 1 +tuvwx 1 +uvwzy 1 +vwxyz 1 +wxyza 1 +PREHOOK: query: SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +B4F3CAFDBE 1 +a 1 +ab 1 +abc 2 +abcd 1 +abcdd 1 +abcde 3 +abcded 4 +abcdede 3 +abcdedef 1 +abcdef 1 +abcdefg 1 +abcdefgh 1 +b 1 +PREHOOK: query: SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +abc 1 +bcd 1 +cde 1 +def 1 +efg 1 +fgh 1 +ghi 1 +hij 1 +ijk 1 +jkl 1 +klm 1 +lmn 1 +mno 1 +nop 1 +pqr 1 +qrs 2 +stu 1 +vwx 1 +wxy 1 +yza 1 +zab 1 +PREHOOK: query: SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +2011-01-01 01:01:01.111111111 1 +2012-02-02 02:02:02.222222222 1 +2013-03-03 03:03:03.333333333 1 +2014-04-04 04:04:04.444444444 1 +2015-05-05 05:05:05.555555555 1 +2016-06-06 06:06:06.666666666 1 +2017-07-07 07:07:07.777777777 1 +2018-08-08 08:08:08.888888888 1 +2019-09-09 09:09:09.999999999 1 +2020-10-10 10:10:10.101010101 1 +2021-11-11 11:11:11.111111111 1 +2022-12-12 12:12:12.121212121 1 +2023-01-02 13:13:13.131313131 1 +2024-02-02 14:14:14.141414141 1 +2025-03-03 15:15:15.151515151 1 +2026-04-04 16:16:16.161616161 1 +2027-05-05 17:17:17.171717171 1 +2028-06-06 18:18:18.181818181 1 +2029-07-07 19:19:19.191919191 1 +2030-08-08 20:20:20.202020202 1 +2031-09-09 21:21:21.212121212 1 +2032-10-10 22:22:22.222222222 1 +PREHOOK: query: SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### + 1 +68656C6C6F 1 +68692CCAC0BDE7 10 +B4F3CAFDBEDD 10 diff --git ql/src/test/results/clientpositive/vectorized_parquet_types.q.out ql/src/test/results/clientpositive/vectorized_parquet_types.q.out index 281fe93..f493102 100644 --- ql/src/test/results/clientpositive/vectorized_parquet_types.q.out +++ ql/src/test/results/clientpositive/vectorized_parquet_types.q.out @@ -149,6 +149,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized Stage: Stage-0 Fetch Operator @@ -216,6 +217,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized Stage: Stage-0 Fetch Operator @@ -305,6 +307,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: tinyint) Statistics: Num rows: 22 Data size: 242 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: smallint), _col3 (type: bigint), _col4 (type: struct), _col5 (type: struct), _col6 (type: decimal(4,2)) + Execution mode: vectorized Reduce Operator Tree: Group By Operator aggregations: max(VALUE._col0), min(VALUE._col1), count(VALUE._col2), avg(VALUE._col3), stddev_pop(VALUE._col4), max(VALUE._col5)