diff --git a/pom.xml b/pom.xml index 1abf738..45649b3 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ 1.9.5 2.0.0-M5 4.0.23.Final - 1.7.0 + 1.8.0rc2-SNAPSHOT 0.12.0 2.5.0 1.0.1 diff --git a/ql/pom.xml b/ql/pom.xml index 6026c49..4c61f60 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -321,6 +321,13 @@ test + org.apache.parquet + parquet-hadoop-bundle + tests + ${parquet.version} + test + + junit junit ${junit.version} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index e1b6dd8..2311c82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -17,6 +17,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; @@ -30,11 +31,9 @@ /** * * A Parquet InputFormat for Hive (with the deprecated package mapred) - * - * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types - * are not currently supported. Removing the interface turns off vectorization. */ -public class MapredParquetInputFormat extends FileInputFormat { +public class MapredParquetInputFormat extends FileInputFormat implements VectorizedInputFormatInterface { private static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index 98691c7..3b49ada 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -14,16 +14,29 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.base.Strings; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -32,7 +45,22 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.ParquetRecordReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.vector.IntColumnVector; +import org.apache.parquet.vector.RowBatch; /** * Vectorized input format for Parquet files @@ -49,24 +77,61 @@ RecordReader { private static final Log LOG = LogFactory.getLog(VectorizedParquetRecordReader.class); - private final ParquetRecordReaderWrapper internalReader; - private VectorizedRowBatchCtx rbCtx; - private ObjectArrayWritable internalValues; - private NullWritable internalKey; - private VectorColumnAssign[] assigners; + private JobConf jobConf; + private final long splitLen; + private boolean skipTimestampConversion = false; + private final ProjectionPusher projectionPusher; + + private ParquetRecordReader internalReader; + private RowBatch parquetRowBatch; + private VectorizedRowBatchCtx rbCtx; + + // Column projection list of column indexes to include. It does not contain partition columns. + private List colsToInclude; + // The Type of all the columns + private List columnTypes; public VectorizedParquetRecordReader( - ParquetInputFormat realInput, - FileSplit split, - JobConf conf, Reporter reporter) throws IOException, InterruptedException { - internalReader = new ParquetRecordReaderWrapper( - realInput, - split, - conf, - reporter); + ParquetInputFormat newInputFormat, + FileSplit oldSplit, + JobConf oldJobConf, Reporter reporter) throws IOException, InterruptedException { + + // create internalReader, which is a Parquet vectorized reader + this.splitLen = oldSplit.getLength(); + this.projectionPusher = new ProjectionPusher(); + + jobConf = oldJobConf; + final ParquetInputSplit split = getSplit(oldSplit, jobConf); + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get(IOConstants.MAPRED_TASK_ID)); + + if (taskAttemptID == null) { + taskAttemptID = new TaskAttemptID(); + } + + if (skipTimestampConversion ^ HiveConf.getBoolVar( + jobConf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + HiveConf.setBoolVar(jobConf, + HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, skipTimestampConversion); + } + + final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); + + if (split != null) { + internalReader = (ParquetRecordReader) newInputFormat.createRecordReader(split, + taskContext); + internalReader.initialize(split, taskContext); + parquetRowBatch = new RowBatch(); + } + + // init vectorization related attributes + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(oldJobConf.get(IOConstants + .COLUMNS_TYPES)); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(oldJobConf); + try { rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, split); + rbCtx.init(oldJobConf, oldSplit); } catch (Exception e) { throw new RuntimeException(e); } @@ -74,7 +139,6 @@ public VectorizedParquetRecordReader( @Override public NullWritable createKey() { - internalKey = internalReader.createKey(); return NullWritable.get(); } @@ -83,7 +147,6 @@ public VectorizedRowBatch createValue() { VectorizedRowBatch outputBatch = null; try { outputBatch = rbCtx.createVectorizedRowBatch(); - internalValues = internalReader.createValue(); } catch (HiveException e) { throw new RuntimeException("Error creating a batch", e); } @@ -92,7 +155,8 @@ public VectorizedRowBatch createValue() { @Override public long getPos() throws IOException { - return internalReader.getPos(); + // this impl is copied from ParquetRecordReaderWrapper. + return (long) (splitLen * getProgress()); } @Override @@ -102,41 +166,174 @@ public void close() throws IOException { @Override public float getProgress() throws IOException { - return internalReader.getProgress(); + if (internalReader == null) { + return 1f; + } else { + try { + return internalReader.getProgress(); + } catch (final InterruptedException e) { + throw new IOException(e); + } + } } @Override public boolean next(NullWritable key, VectorizedRowBatch outputBatch) throws IOException { - if (assigners != null) { - assert(outputBatch.numCols == assigners.length); - } + outputBatch.reset(); - int maxSize = outputBatch.getMaxSize(); + + // Check whether it is the end of file try { - while (outputBatch.size < maxSize) { - if (false == internalReader.next(internalKey, internalValues)) { - outputBatch.endOfFile = true; - break; + if (internalReader.getProgress() == 1.0) { + outputBatch.endOfFile = true; + return false; + } + } catch (Exception e) { + throw new IOException(e); + } + + // read a row batch from Parquet + parquetRowBatch = internalReader.nextBatch(parquetRowBatch); + assert (colsToInclude.size() == parquetRowBatch.getColumns().length); + + org.apache.parquet.vector.ColumnVector[] columnVectors = parquetRowBatch.getColumns(); + int parquetBatchLength = columnVectors[0].size(); + assert (outputBatch.getMaxSize() >= parquetBatchLength); + + // converting Parquet vector to Hive vector + for (int i = 0; i < parquetRowBatch.getColumns().length; i++) { + assignVector(outputBatch.cols[colsToInclude.get(i)], parquetRowBatch.getColumns()[i], + parquetBatchLength, columnTypes.get(colsToInclude.get(i))); + } + + outputBatch.size += parquetBatchLength; + + return outputBatch.size > 0; + } + + private void assignVector(ColumnVector hiveVector, + org.apache.parquet.vector.ColumnVector parquetVector, int len, + TypeInfo columnType) { + assert (hiveVector != null && columnType.getCategory() == ObjectInspector.Category.PRIMITIVE); + + // assign attributes in vector + assignVectorAttributes(); + + // assign values in vector + PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + switch (primitiveColumnType.getPrimitiveCategory()) { + case INT: + int[] src_int = ((IntColumnVector)parquetVector).values; + long[] dest_int = ((LongColumnVector)hiveVector).vector; + for(int j = 0; j < len; j++) { + dest_int[j] = src_int[j]; + } + break; + case LONG: + long[] src_long = ((org.apache.parquet.vector.LongColumnVector)parquetVector).values; + long[] dest_long = ((LongColumnVector)hiveVector).vector; + for(int j = 0; j < len; j++) { + dest_long[j] = src_long[j]; + } + break; + // TODO: support all the primitive types + case BOOLEAN: + case BYTE: + case SHORT: + case TIMESTAMP: + case DATE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case FLOAT: + case DOUBLE: + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + case DECIMAL: + default: + throw new RuntimeException("Not support data type: " + primitiveColumnType + .getPrimitiveCategory()); + } + } + + private void assignVectorAttributes() { + // TODO: noNulls, isRepeating, isNull[]. This depends on PARQUET-333 + } + + /** + * gets a ParquetInputSplit corresponding to a split given by Hive + * + * @param oldSplit The split given by Hive + * @param conf The JobConf of the Hive job + * @return a ParquetInputSplit corresponding to the oldSplit + * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file + */ + // TODO: Refactor. This method is copied from ParquetRecordReaderWrapper. + protected ParquetInputSplit getSplit( + final InputSplit oldSplit, + final JobConf conf + ) throws IOException { + ParquetInputSplit split; + if (oldSplit instanceof FileSplit) { + final Path finalPath = ((FileSplit) oldSplit).getPath(); + jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + FilterCompat.Filter filter = ParquetRecordReaderWrapper.setFilter(jobConf); + + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); + final List blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + DataWritableReadSupport readSupport = new DataWritableReadSupport(); + final ReadSupport.ReadContext readContext = readSupport.init(new InitContext(jobConf, + null, fileMetaData.getSchema())); + final List splitGroup = new ArrayList(); + final long splitStart = ((FileSplit) oldSplit).getStart(); + final long splitLength = ((FileSplit) oldSplit).getLength(); + for (final BlockMetaData block : blocks) { + final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { + splitGroup.add(block); } - Object[] values = internalValues.get(); + } + if (splitGroup.isEmpty()) { + LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); + return null; + } - if (null == assigners) { - // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet - // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and - // the writable value (IntWritable). see Parquet's ETypeConverter class. - assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, values); + List filtedBlocks; + if (filter != null) { + filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filtedBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); + return null; } - for(int i = 0; i < values.length; ++i) { - assigners[i].assignObjectValue(values[i], outputBatch.size); + long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); } - ++outputBatch.size; - } - } catch (HiveException e) { - throw new RuntimeException(e); + } else { + filtedBlocks = splitGroup; + } + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + } + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + ((FileSplit) oldSplit).getLocations(), + filtedBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; + } else { + throw new IllegalArgumentException("Unknown split type: " + oldSplit); } - return outputBatch.size > 0; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index adeb971..10e8026 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -137,7 +137,7 @@ public ParquetRecordReaderWrapper( } } - public FilterCompat.Filter setFilter(final JobConf conf) { + public static FilterCompat.Filter setFilter(final JobConf conf) { String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedParquetReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedParquetReader.java new file mode 100644 index 0000000..c76425d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedParquetReader.java @@ -0,0 +1,283 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Properties; + +public class TestVectorizedParquetReader { + static final int nElements = 2000; + static Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp")); + static final Path warehouseDir = new Path(workDir, "warehouse"); + static final Path tablePath = new Path(warehouseDir, "vectorization"); + static final Path partitionPath = new Path(tablePath, "p=0"); + static final Path file = new Path(partitionPath, "testParquetVectorReader"); + static final Configuration conf = new Configuration(); + + private String columnNames = "binary_field,int32_field,int64_field,int96_field,double_field," + + "float_field,boolean_field"; + private String columnTypes = "binary,int,bigint,binary,double,float,boolean"; + + static final MessageType schema = MessageTypeParser.parseMessageType( + "message test { " + + "required binary binary_field; " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required int96 int96_field; " + + "required double double_field; " + + "required float float_field; " + + "required boolean boolean_field; " +// + "required fixed_len_byte_array(3) flba_field; " +// + "optional binary some_null_field; " +// + "optional binary all_null_field; " +// + "required binary var_len_binary_field; " + + "} "); + + @BeforeClass + public static void prepareFile() throws IOException { + cleanup(); + + int blockSize = 1024*1024; + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + ParquetWriter writer = new ParquetWriter<>(file, new GroupWriteSupport(), + CompressionCodecName.UNCOMPRESSED, blockSize, 1024, 1024*1024, true, false, + ParquetProperties.WriterVersion.PARQUET_2_0, conf); + writeData(factory, writer); + } + + static void writeData(SimpleGroupFactory factory, ParquetWriter writer) throws IOException { + for (int i = 0; i < nElements; i++) { + char c = (char) ((i % 26) + 'a'); + String b = String.valueOf(c); + + char[] charArray = new char[i + 1]; + Arrays.fill(charArray, c); + Group group = factory.newGroup() + .append("binary_field", b) + .append("int32_field", i) + .append("int64_field", (long) 2 * i) + .append("int96_field", Binary.fromByteArray("999999999999".getBytes())) + .append("double_field", i * 1.0) + .append("float_field", ((float) (i * 2.0))); +// .append("boolean_field", i % 5 == 0) +// .append("var_len_binary_field", new String(charArray)) +// .append("flba_field", "foo"); + +// if (i % 2 == 1) { +// group.append("some_null_field", "test"); +// } + writer.write(group); + } + writer.close(); + } + + @AfterClass + public static void cleanup() throws IOException { + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + fs.delete(file, true); + } + } + + @Test + public void testVectorReadInt() throws Exception { + // create reader + JobConf job = createMockEnv("vectorization", "1"); + List columnIndexes = ColumnProjectionUtils.getReadColumnIDs(job); + + RecordReader reader = createReader(job); + + NullWritable key = reader.createKey(); + VectorizedRowBatch batch = reader.createValue(); + long expect = 0; + + // read and verify vector + while (reader.next(key, batch)) { + assertVectorTypes(batch, 1, columnIndexes, LongColumnVector.class); + + LongColumnVector columnVector = (LongColumnVector) batch.cols[columnIndexes.get(0)]; + for (int i = 0; i < batch.size; i++) { + assertVectorValue(columnVector, i, expect); + expect++; + } + } + } + + @Test + public void testVectorReadLong() throws Exception { + // create reader + JobConf job = createMockEnv("vectorization", "2"); + List columnIndexes = ColumnProjectionUtils.getReadColumnIDs(job); + + RecordReader reader = createReader(job); + + NullWritable key = reader.createKey(); + VectorizedRowBatch batch = reader.createValue(); + long expect = 0; + + // read and verify vector + while (reader.next(key, batch)) { + assertVectorTypes(batch, 1, columnIndexes, LongColumnVector.class); + + LongColumnVector columnVector = (LongColumnVector) batch.cols[columnIndexes.get(0)]; + for (int i = 0; i < batch.size; i++) { + assertVectorValue(columnVector, i, expect * 2); + expect++; + } + } + } + + // TODO: test more data types + + // TODO: testVectorReadMultipleTypes(), testVectorReadAll() + + private void assertVectorTypes(VectorizedRowBatch batch, int columnCount, + List indexes, Class... vectorType) { + int numCols = 0; + for(ColumnVector vector : batch.cols) { + if (vector != null) { + numCols++; + } + } + Assert.assertTrue(numCols == columnCount); + Iterator indexIterator = indexes.iterator(); + for (int i = 0; i < columnCount; i++) { + ColumnVector vector = batch.cols[indexIterator.next()]; + Assert.assertTrue(vectorType[i].isInstance(vector)); + } + } + + private void assertVectorValue(ColumnVector vector, int index, Object expect) { + if (vector instanceof LongColumnVector) { + long value = ((LongColumnVector) vector).vector[index]; + Assert.assertEquals(expect, value); + } + } + + private JobConf createMockEnv(String tableName, String columnsIds) throws IOException { + Utilities.clearWorkMap(); + JobConf conf = new JobConf(); + conf.set("hive.exec.plan", workDir.toString()); + conf.set("mapred.job.tracker", "local"); + conf.set("hive.vectorized.execution.enabled", Boolean.toString(true)); + conf.set("mapred.mapper.class", ExecMapper.class.getName()); + + Path tablePath = new Path(warehouseDir, tableName); + + // build partition strings + String[] partPath = new String[1]; + StringBuilder buffer = new StringBuilder(); + for(int p=0; p < 1; ++p) { + partPath[p] = new Path(tablePath, "p=" + p).toString(); + if (p != 0) { + buffer.append(','); + } + buffer.append(partPath[p]); + } + conf.set("mapred.input.dir", buffer.toString()); + + conf.set("hive.io.file.readcolumn.ids", columnsIds); + conf.set("partition_columns", "p"); + conf.set(serdeConstants.LIST_COLUMNS, columnNames.toString()); + conf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); + + Properties tblProps = new Properties(); + tblProps.put("name", tableName); + tblProps.put("serialization.lib", ParquetHiveSerDe.class.getName()); + tblProps.put("columns", columnNames.toString()); + tblProps.put("columns.types", columnTypes.toString()); + TableDesc tableDesc = new TableDesc(VectorizedParquetInputFormat.class, + MapredParquetOutputFormat.class, tblProps); + + MapWork mapWork = new MapWork(); + mapWork.setVectorMode(true); + mapWork.setUseBucketizedHiveInputFormat(false); + LinkedHashMap> aliasMap = + new LinkedHashMap>(); + ArrayList aliases = new ArrayList(); + aliases.add(tableName); + LinkedHashMap partMap = + new LinkedHashMap(); + for(int p=0; p < 1; ++p) { + aliasMap.put(partPath[p], aliases); + LinkedHashMap partSpec = + new LinkedHashMap(); + PartitionDesc part = new PartitionDesc(tableDesc, partSpec); + partMap.put(partPath[p], part); + } + mapWork.setPathToAliases(aliasMap); + mapWork.setPathToPartitionInfo(partMap); + + // write the plan out + FileSystem localFs = FileSystem.getLocal(conf).getRaw(); + Path mapXml = new Path(workDir, "map.xml"); + localFs.delete(mapXml, true); + FSDataOutputStream planStream = localFs.create(mapXml); + Utilities.serializePlan(mapWork, planStream, conf); + planStream.close(); + return conf; + } + + private RecordReader createReader(JobConf job) throws + IOException { + VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat(new + ParquetInputFormat<>(DataWritableReadSupport.class)); + InputSplit[] splits = inputFormat.getSplits(job, 1); + + return inputFormat.getRecordReader(splits[0], job, null); + } +} diff --git a/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q b/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q new file mode 100644 index 0000000..8580af4 --- /dev/null +++ b/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q @@ -0,0 +1,13 @@ + +create table if not exists alltypes_parquet ( + int32_field int) stored as parquet; + +LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet; + +SET hive.vectorized.execution.enabled=true; + +explain select count(int32_field) + from alltypes_parquet; + +select count(int32_field) + from alltypes_parquet; diff --git a/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out b/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out new file mode 100644 index 0000000..1b05507 --- /dev/null +++ b/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out @@ -0,0 +1,80 @@ +PREHOOK: query: create table if not exists alltypes_parquet ( + int32_field int) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@alltypes_parquet +POSTHOOK: query: create table if not exists alltypes_parquet ( + int32_field int) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@alltypes_parquet +PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@alltypes_parquet +POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@alltypes_parquet +PREHOOK: query: explain select count(int32_field) + from alltypes_parquet +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(int32_field) + from alltypes_parquet +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: alltypes_parquet + Statistics: Num rows: 331907 Data size: 1327631 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: int32_field (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 331907 Data size: 1327631 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(int32_field) + from alltypes_parquet +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypes_parquet +#### A masked pattern was here #### +POSTHOOK: query: select count(int32_field) + from alltypes_parquet +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypes_parquet +#### A masked pattern was here #### +100000