diff --git a/pom.xml b/pom.xml
index 1abf738..45649b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@
1.9.5
2.0.0-M5
4.0.23.Final
- 1.7.0
+ 1.8.0rc2-SNAPSHOT
0.12.0
2.5.0
1.0.1
diff --git a/ql/pom.xml b/ql/pom.xml
index 6026c49..4c61f60 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -321,6 +321,13 @@
test
+ org.apache.parquet
+ parquet-hadoop-bundle
+ tests
+ ${parquet.version}
+ test
+
+
junit
junit
${junit.version}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
index e1b6dd8..2311c82 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
@@ -17,6 +17,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable;
@@ -30,11 +31,9 @@
/**
*
* A Parquet InputFormat for Hive (with the deprecated package mapred)
- *
- * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types
- * are not currently supported. Removing the interface turns off vectorization.
*/
-public class MapredParquetInputFormat extends FileInputFormat {
+public class MapredParquetInputFormat extends FileInputFormat implements VectorizedInputFormatInterface {
private static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index 98691c7..3b49ada 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -14,16 +14,29 @@
package org.apache.hadoop.hive.ql.io.parquet;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign;
-import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -32,7 +45,22 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.vector.IntColumnVector;
+import org.apache.parquet.vector.RowBatch;
/**
* Vectorized input format for Parquet files
@@ -49,24 +77,61 @@
RecordReader {
private static final Log LOG = LogFactory.getLog(VectorizedParquetRecordReader.class);
- private final ParquetRecordReaderWrapper internalReader;
- private VectorizedRowBatchCtx rbCtx;
- private ObjectArrayWritable internalValues;
- private NullWritable internalKey;
- private VectorColumnAssign[] assigners;
+ private JobConf jobConf;
+ private final long splitLen;
+ private boolean skipTimestampConversion = false;
+ private final ProjectionPusher projectionPusher;
+
+ private ParquetRecordReader internalReader;
+ private RowBatch parquetRowBatch;
+ private VectorizedRowBatchCtx rbCtx;
+
+ // Column projection list of column indexes to include. It does not contain partition columns.
+ private List colsToInclude;
+ // The Type of all the columns
+ private List columnTypes;
public VectorizedParquetRecordReader(
- ParquetInputFormat realInput,
- FileSplit split,
- JobConf conf, Reporter reporter) throws IOException, InterruptedException {
- internalReader = new ParquetRecordReaderWrapper(
- realInput,
- split,
- conf,
- reporter);
+ ParquetInputFormat newInputFormat,
+ FileSplit oldSplit,
+ JobConf oldJobConf, Reporter reporter) throws IOException, InterruptedException {
+
+ // create internalReader, which is a Parquet vectorized reader
+ this.splitLen = oldSplit.getLength();
+ this.projectionPusher = new ProjectionPusher();
+
+ jobConf = oldJobConf;
+ final ParquetInputSplit split = getSplit(oldSplit, jobConf);
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get(IOConstants.MAPRED_TASK_ID));
+
+ if (taskAttemptID == null) {
+ taskAttemptID = new TaskAttemptID();
+ }
+
+ if (skipTimestampConversion ^ HiveConf.getBoolVar(
+ jobConf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+ HiveConf.setBoolVar(jobConf,
+ HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, skipTimestampConversion);
+ }
+
+ final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
+
+ if (split != null) {
+ internalReader = (ParquetRecordReader) newInputFormat.createRecordReader(split,
+ taskContext);
+ internalReader.initialize(split, taskContext);
+ parquetRowBatch = new RowBatch();
+ }
+
+ // init vectorization related attributes
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(oldJobConf.get(IOConstants
+ .COLUMNS_TYPES));
+ colsToInclude = ColumnProjectionUtils.getReadColumnIDs(oldJobConf);
+
try {
rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(conf, split);
+ rbCtx.init(oldJobConf, oldSplit);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -74,7 +139,6 @@ public VectorizedParquetRecordReader(
@Override
public NullWritable createKey() {
- internalKey = internalReader.createKey();
return NullWritable.get();
}
@@ -83,7 +147,6 @@ public VectorizedRowBatch createValue() {
VectorizedRowBatch outputBatch = null;
try {
outputBatch = rbCtx.createVectorizedRowBatch();
- internalValues = internalReader.createValue();
} catch (HiveException e) {
throw new RuntimeException("Error creating a batch", e);
}
@@ -92,7 +155,8 @@ public VectorizedRowBatch createValue() {
@Override
public long getPos() throws IOException {
- return internalReader.getPos();
+ // this impl is copied from ParquetRecordReaderWrapper.
+ return (long) (splitLen * getProgress());
}
@Override
@@ -102,41 +166,174 @@ public void close() throws IOException {
@Override
public float getProgress() throws IOException {
- return internalReader.getProgress();
+ if (internalReader == null) {
+ return 1f;
+ } else {
+ try {
+ return internalReader.getProgress();
+ } catch (final InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
}
@Override
public boolean next(NullWritable key, VectorizedRowBatch outputBatch)
throws IOException {
- if (assigners != null) {
- assert(outputBatch.numCols == assigners.length);
- }
+
outputBatch.reset();
- int maxSize = outputBatch.getMaxSize();
+
+ // Check whether it is the end of file
try {
- while (outputBatch.size < maxSize) {
- if (false == internalReader.next(internalKey, internalValues)) {
- outputBatch.endOfFile = true;
- break;
+ if (internalReader.getProgress() == 1.0) {
+ outputBatch.endOfFile = true;
+ return false;
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ // read a row batch from Parquet
+ parquetRowBatch = internalReader.nextBatch(parquetRowBatch);
+ assert (colsToInclude.size() == parquetRowBatch.getColumns().length);
+
+ org.apache.parquet.vector.ColumnVector[] columnVectors = parquetRowBatch.getColumns();
+ int parquetBatchLength = columnVectors[0].size();
+ assert (outputBatch.getMaxSize() >= parquetBatchLength);
+
+ // converting Parquet vector to Hive vector
+ for (int i = 0; i < parquetRowBatch.getColumns().length; i++) {
+ assignVector(outputBatch.cols[colsToInclude.get(i)], parquetRowBatch.getColumns()[i],
+ parquetBatchLength, columnTypes.get(colsToInclude.get(i)));
+ }
+
+ outputBatch.size += parquetBatchLength;
+
+ return outputBatch.size > 0;
+ }
+
+ private void assignVector(ColumnVector hiveVector,
+ org.apache.parquet.vector.ColumnVector parquetVector, int len,
+ TypeInfo columnType) {
+ assert (hiveVector != null && columnType.getCategory() == ObjectInspector.Category.PRIMITIVE);
+
+ // assign attributes in vector
+ assignVectorAttributes();
+
+ // assign values in vector
+ PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+ switch (primitiveColumnType.getPrimitiveCategory()) {
+ case INT:
+ int[] src_int = ((IntColumnVector)parquetVector).values;
+ long[] dest_int = ((LongColumnVector)hiveVector).vector;
+ for(int j = 0; j < len; j++) {
+ dest_int[j] = src_int[j];
+ }
+ break;
+ case LONG:
+ long[] src_long = ((org.apache.parquet.vector.LongColumnVector)parquetVector).values;
+ long[] dest_long = ((LongColumnVector)hiveVector).vector;
+ for(int j = 0; j < len; j++) {
+ dest_long[j] = src_long[j];
+ }
+ break;
+ // TODO: support all the primitive types
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case TIMESTAMP:
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case DECIMAL:
+ default:
+ throw new RuntimeException("Not support data type: " + primitiveColumnType
+ .getPrimitiveCategory());
+ }
+ }
+
+ private void assignVectorAttributes() {
+ // TODO: noNulls, isRepeating, isNull[]. This depends on PARQUET-333
+ }
+
+ /**
+ * gets a ParquetInputSplit corresponding to a split given by Hive
+ *
+ * @param oldSplit The split given by Hive
+ * @param conf The JobConf of the Hive job
+ * @return a ParquetInputSplit corresponding to the oldSplit
+ * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file
+ */
+ // TODO: Refactor. This method is copied from ParquetRecordReaderWrapper.
+ protected ParquetInputSplit getSplit(
+ final InputSplit oldSplit,
+ final JobConf conf
+ ) throws IOException {
+ ParquetInputSplit split;
+ if (oldSplit instanceof FileSplit) {
+ final Path finalPath = ((FileSplit) oldSplit).getPath();
+ jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent());
+ FilterCompat.Filter filter = ParquetRecordReaderWrapper.setFilter(jobConf);
+
+ final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath);
+ final List blocks = parquetMetadata.getBlocks();
+ final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+ DataWritableReadSupport readSupport = new DataWritableReadSupport();
+ final ReadSupport.ReadContext readContext = readSupport.init(new InitContext(jobConf,
+ null, fileMetaData.getSchema()));
+ final List splitGroup = new ArrayList();
+ final long splitStart = ((FileSplit) oldSplit).getStart();
+ final long splitLength = ((FileSplit) oldSplit).getLength();
+ for (final BlockMetaData block : blocks) {
+ final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
+ if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
+ splitGroup.add(block);
}
- Object[] values = internalValues.get();
+ }
+ if (splitGroup.isEmpty()) {
+ LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit);
+ return null;
+ }
- if (null == assigners) {
- // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet
- // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and
- // the writable value (IntWritable). see Parquet's ETypeConverter class.
- assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, values);
+ List filtedBlocks;
+ if (filter != null) {
+ filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema());
+ if (filtedBlocks.isEmpty()) {
+ LOG.debug("All row groups are dropped due to filter predicates");
+ return null;
}
- for(int i = 0; i < values.length; ++i) {
- assigners[i].assignObjectValue(values[i], outputBatch.size);
+ long droppedBlocks = splitGroup.size() - filtedBlocks.size();
+ if (droppedBlocks > 0) {
+ LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate");
}
- ++outputBatch.size;
- }
- } catch (HiveException e) {
- throw new RuntimeException(e);
+ } else {
+ filtedBlocks = splitGroup;
+ }
+
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+ skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
+ }
+ split = new ParquetInputSplit(finalPath,
+ splitStart,
+ splitLength,
+ ((FileSplit) oldSplit).getLocations(),
+ filtedBlocks,
+ readContext.getRequestedSchema().toString(),
+ fileMetaData.getSchema().toString(),
+ fileMetaData.getKeyValueMetaData(),
+ readContext.getReadSupportMetadata());
+ return split;
+ } else {
+ throw new IllegalArgumentException("Unknown split type: " + oldSplit);
}
- return outputBatch.size > 0;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index adeb971..10e8026 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -137,7 +137,7 @@ public ParquetRecordReaderWrapper(
}
}
- public FilterCompat.Filter setFilter(final JobConf conf) {
+ public static FilterCompat.Filter setFilter(final JobConf conf) {
String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
String columnNamesString =
conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedParquetReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedParquetReader.java
new file mode 100644
index 0000000..c76425d
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedParquetReader.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Properties;
+
+public class TestVectorizedParquetReader {
+ static final int nElements = 2000;
+ static Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp"));
+ static final Path warehouseDir = new Path(workDir, "warehouse");
+ static final Path tablePath = new Path(warehouseDir, "vectorization");
+ static final Path partitionPath = new Path(tablePath, "p=0");
+ static final Path file = new Path(partitionPath, "testParquetVectorReader");
+ static final Configuration conf = new Configuration();
+
+ private String columnNames = "binary_field,int32_field,int64_field,int96_field,double_field," +
+ "float_field,boolean_field";
+ private String columnTypes = "binary,int,bigint,binary,double,float,boolean";
+
+ static final MessageType schema = MessageTypeParser.parseMessageType(
+ "message test { "
+ + "required binary binary_field; "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required int96 int96_field; "
+ + "required double double_field; "
+ + "required float float_field; "
+ + "required boolean boolean_field; "
+// + "required fixed_len_byte_array(3) flba_field; "
+// + "optional binary some_null_field; "
+// + "optional binary all_null_field; "
+// + "required binary var_len_binary_field; "
+ + "} ");
+
+ @BeforeClass
+ public static void prepareFile() throws IOException {
+ cleanup();
+
+ int blockSize = 1024*1024;
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ ParquetWriter writer = new ParquetWriter<>(file, new GroupWriteSupport(),
+ CompressionCodecName.UNCOMPRESSED, blockSize, 1024, 1024*1024, true, false,
+ ParquetProperties.WriterVersion.PARQUET_2_0, conf);
+ writeData(factory, writer);
+ }
+
+ static void writeData(SimpleGroupFactory factory, ParquetWriter writer) throws IOException {
+ for (int i = 0; i < nElements; i++) {
+ char c = (char) ((i % 26) + 'a');
+ String b = String.valueOf(c);
+
+ char[] charArray = new char[i + 1];
+ Arrays.fill(charArray, c);
+ Group group = factory.newGroup()
+ .append("binary_field", b)
+ .append("int32_field", i)
+ .append("int64_field", (long) 2 * i)
+ .append("int96_field", Binary.fromByteArray("999999999999".getBytes()))
+ .append("double_field", i * 1.0)
+ .append("float_field", ((float) (i * 2.0)));
+// .append("boolean_field", i % 5 == 0)
+// .append("var_len_binary_field", new String(charArray))
+// .append("flba_field", "foo");
+
+// if (i % 2 == 1) {
+// group.append("some_null_field", "test");
+// }
+ writer.write(group);
+ }
+ writer.close();
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ if (fs.exists(file)) {
+ fs.delete(file, true);
+ }
+ }
+
+ @Test
+ public void testVectorReadInt() throws Exception {
+ // create reader
+ JobConf job = createMockEnv("vectorization", "1");
+ List columnIndexes = ColumnProjectionUtils.getReadColumnIDs(job);
+
+ RecordReader reader = createReader(job);
+
+ NullWritable key = reader.createKey();
+ VectorizedRowBatch batch = reader.createValue();
+ long expect = 0;
+
+ // read and verify vector
+ while (reader.next(key, batch)) {
+ assertVectorTypes(batch, 1, columnIndexes, LongColumnVector.class);
+
+ LongColumnVector columnVector = (LongColumnVector) batch.cols[columnIndexes.get(0)];
+ for (int i = 0; i < batch.size; i++) {
+ assertVectorValue(columnVector, i, expect);
+ expect++;
+ }
+ }
+ }
+
+ @Test
+ public void testVectorReadLong() throws Exception {
+ // create reader
+ JobConf job = createMockEnv("vectorization", "2");
+ List columnIndexes = ColumnProjectionUtils.getReadColumnIDs(job);
+
+ RecordReader reader = createReader(job);
+
+ NullWritable key = reader.createKey();
+ VectorizedRowBatch batch = reader.createValue();
+ long expect = 0;
+
+ // read and verify vector
+ while (reader.next(key, batch)) {
+ assertVectorTypes(batch, 1, columnIndexes, LongColumnVector.class);
+
+ LongColumnVector columnVector = (LongColumnVector) batch.cols[columnIndexes.get(0)];
+ for (int i = 0; i < batch.size; i++) {
+ assertVectorValue(columnVector, i, expect * 2);
+ expect++;
+ }
+ }
+ }
+
+ // TODO: test more data types
+
+ // TODO: testVectorReadMultipleTypes(), testVectorReadAll()
+
+ private void assertVectorTypes(VectorizedRowBatch batch, int columnCount,
+ List indexes, Class... vectorType) {
+ int numCols = 0;
+ for(ColumnVector vector : batch.cols) {
+ if (vector != null) {
+ numCols++;
+ }
+ }
+ Assert.assertTrue(numCols == columnCount);
+ Iterator indexIterator = indexes.iterator();
+ for (int i = 0; i < columnCount; i++) {
+ ColumnVector vector = batch.cols[indexIterator.next()];
+ Assert.assertTrue(vectorType[i].isInstance(vector));
+ }
+ }
+
+ private void assertVectorValue(ColumnVector vector, int index, Object expect) {
+ if (vector instanceof LongColumnVector) {
+ long value = ((LongColumnVector) vector).vector[index];
+ Assert.assertEquals(expect, value);
+ }
+ }
+
+ private JobConf createMockEnv(String tableName, String columnsIds) throws IOException {
+ Utilities.clearWorkMap();
+ JobConf conf = new JobConf();
+ conf.set("hive.exec.plan", workDir.toString());
+ conf.set("mapred.job.tracker", "local");
+ conf.set("hive.vectorized.execution.enabled", Boolean.toString(true));
+ conf.set("mapred.mapper.class", ExecMapper.class.getName());
+
+ Path tablePath = new Path(warehouseDir, tableName);
+
+ // build partition strings
+ String[] partPath = new String[1];
+ StringBuilder buffer = new StringBuilder();
+ for(int p=0; p < 1; ++p) {
+ partPath[p] = new Path(tablePath, "p=" + p).toString();
+ if (p != 0) {
+ buffer.append(',');
+ }
+ buffer.append(partPath[p]);
+ }
+ conf.set("mapred.input.dir", buffer.toString());
+
+ conf.set("hive.io.file.readcolumn.ids", columnsIds);
+ conf.set("partition_columns", "p");
+ conf.set(serdeConstants.LIST_COLUMNS, columnNames.toString());
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
+
+ Properties tblProps = new Properties();
+ tblProps.put("name", tableName);
+ tblProps.put("serialization.lib", ParquetHiveSerDe.class.getName());
+ tblProps.put("columns", columnNames.toString());
+ tblProps.put("columns.types", columnTypes.toString());
+ TableDesc tableDesc = new TableDesc(VectorizedParquetInputFormat.class,
+ MapredParquetOutputFormat.class, tblProps);
+
+ MapWork mapWork = new MapWork();
+ mapWork.setVectorMode(true);
+ mapWork.setUseBucketizedHiveInputFormat(false);
+ LinkedHashMap> aliasMap =
+ new LinkedHashMap>();
+ ArrayList aliases = new ArrayList();
+ aliases.add(tableName);
+ LinkedHashMap partMap =
+ new LinkedHashMap();
+ for(int p=0; p < 1; ++p) {
+ aliasMap.put(partPath[p], aliases);
+ LinkedHashMap partSpec =
+ new LinkedHashMap();
+ PartitionDesc part = new PartitionDesc(tableDesc, partSpec);
+ partMap.put(partPath[p], part);
+ }
+ mapWork.setPathToAliases(aliasMap);
+ mapWork.setPathToPartitionInfo(partMap);
+
+ // write the plan out
+ FileSystem localFs = FileSystem.getLocal(conf).getRaw();
+ Path mapXml = new Path(workDir, "map.xml");
+ localFs.delete(mapXml, true);
+ FSDataOutputStream planStream = localFs.create(mapXml);
+ Utilities.serializePlan(mapWork, planStream, conf);
+ planStream.close();
+ return conf;
+ }
+
+ private RecordReader createReader(JobConf job) throws
+ IOException {
+ VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat(new
+ ParquetInputFormat<>(DataWritableReadSupport.class));
+ InputSplit[] splits = inputFormat.getSplits(job, 1);
+
+ return inputFormat.getRecordReader(splits[0], job, null);
+ }
+}
diff --git a/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q b/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q
new file mode 100644
index 0000000..8580af4
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q
@@ -0,0 +1,13 @@
+
+create table if not exists alltypes_parquet (
+ int32_field int) stored as parquet;
+
+LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain select count(int32_field)
+ from alltypes_parquet;
+
+select count(int32_field)
+ from alltypes_parquet;
diff --git a/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out b/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out
new file mode 100644
index 0000000..1b05507
--- /dev/null
+++ b/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out
@@ -0,0 +1,80 @@
+PREHOOK: query: create table if not exists alltypes_parquet (
+ int32_field int) stored as parquet
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@alltypes_parquet
+POSTHOOK: query: create table if not exists alltypes_parquet (
+ int32_field int) stored as parquet
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@alltypes_parquet
+PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@alltypes_parquet
+POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@alltypes_parquet
+PREHOOK: query: explain select count(int32_field)
+ from alltypes_parquet
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select count(int32_field)
+ from alltypes_parquet
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: alltypes_parquet
+ Statistics: Num rows: 331907 Data size: 1327631 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: int32_field (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 331907 Data size: 1327631 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(int32_field)
+ from alltypes_parquet
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypes_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: select count(int32_field)
+ from alltypes_parquet
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypes_parquet
+#### A masked pattern was here ####
+100000