diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index c0bfc3f..c1c6103 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -18,15 +18,20 @@ package org.apache.hadoop.hive.ql.io.parquet; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.serde.VectorizedColumnReaderTestUtils; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -43,6 +48,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -54,6 +60,7 @@ import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -65,44 +72,60 @@ JobConf conf; String columnNames; String columnTypes; + Path testPath; + String schemaStr; + MessageType fileSchema; + StructObjectInspector inspector; @Before public void initConf() throws Exception { conf = new JobConf(); - - } - - @Test - public void testRowGroupFilterTakeEffect() throws Exception { // define schema columnNames = "intCol"; columnTypes = "int"; - StructObjectInspector inspector = getObjectInspector(columnNames, columnTypes); - MessageType fileSchema = MessageTypeParser.parseMessageType( - "message hive_schema {\n" - + " optional int32 intCol;\n" - + "}\n" + schemaStr = + "message hive_schema {\n" + + " optional int32 intCol;\n" + + "}\n"; + fileSchema = MessageTypeParser.parseMessageType( + schemaStr ); + inspector = getObjectInspector(columnNames, columnTypes); conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "intCol"); conf.set("columns", "intCol"); conf.set("columns.types", "int"); // create Parquet file with specific data - Path testPath = writeDirect("RowGroupFilterTakeEffect", fileSchema, - new DirectWriter() { - @Override - public void write(RecordConsumer consumer) { - for(int i = 0; i < 100; i++) { - consumer.startMessage(); - consumer.startField("int", 0); - consumer.addInteger(i); - consumer.endField("int", 0); - consumer.endMessage(); - } + testPath = writeDirect("RowGroupFilterTakeEffect", fileSchema, new DirectWriter() { + @Override + public void write(RecordConsumer consumer) { + for (int i = 0; i < 100; i++) { + consumer.startMessage(); + consumer.startField("int", 0); + consumer.addInteger(i); + consumer.endField("int", 0); + consumer.endMessage(); } - }); + } + }); + } + + @After + public void tearDown() { + FileSystem fs; + try { + fs = testPath.getFileSystem(conf); + if (fs.exists(testPath)) { + fs.delete(testPath, true); + } + } catch (IOException e) { + throw new RuntimeException("Fail to delete the test path"); + } + } + @Test + public void testRowGroupFilterTakeEffect() throws Exception { FileSplit testSplit = new FileSplit(testPath, 0, fileLength(testPath), (String[]) null); ParquetInputSplit parquetSplit = new ParquetInputSplit(testPath, testSplit.getStart(), @@ -112,14 +135,7 @@ public void write(RecordConsumer consumer) { null); // > 50 - GenericUDF udf = new GenericUDFOPGreaterThan(); - List children = Lists.newArrayList(); - ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Integer.class, "intCol", "T", false); - ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(50); - children.add(columnDesc); - children.add(constantDesc); - ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); - String searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); + String searchArgumentStr = buildSearchArgumentStr1(); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); @@ -139,10 +155,7 @@ public void write(RecordConsumer consumer) { } // > 100 - constantDesc = new ExprNodeConstantDesc(100); - children.set(1, constantDesc); - genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); - searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); + searchArgumentStr = buildSearchArgumentStr2(); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); sarg = ConvertAstToSearchArg.createFromConf(conf); p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); @@ -160,6 +173,78 @@ public void write(RecordConsumer consumer) { } } + @Test + public void testRowGroupFilterTakeEffectForVectorization() throws Exception { + // > 50 + String searchArgumentStr = buildSearchArgumentStr1(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); + + ParquetInputFormat.setFilterPredicate(conf, p); + + VectorizedParquetRecordReader reader = + VectorizedColumnReaderTestUtils.createParquetReader(schemaStr, conf, testPath); + VectorizedRowBatch previous = reader.createValue(); + + try { + boolean hasValue = reader.next(NullWritable.get(), previous); + Assert.assertTrue("No row groups should be filtered.", hasValue); + } finally { + reader.close(); + } + + // > 100 + searchArgumentStr = buildSearchArgumentStr2(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + sarg = ConvertAstToSearchArg.createFromConf(conf); + p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); + ParquetInputFormat.setFilterPredicate(conf, p); + + reader = + VectorizedColumnReaderTestUtils.createParquetReader(schemaStr, conf, testPath); + previous = reader.createValue(); + try { + boolean hasValue = reader.next(NullWritable.get(), previous); + Assert.assertFalse("Row groups should be filtered.", hasValue); + } finally { + reader.close(); + } + } + + + /** + * Build the search argument string great than 50 + * + * @return searchArgumentStr + */ + private String buildSearchArgumentStr1(){ + GenericUDF udf = new GenericUDFOPGreaterThan(); + List children = Lists.newArrayList(); + ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Integer.class, "intCol", "T", false); + ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(50); + children.add(columnDesc); + children.add(constantDesc); + ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + return SerializationUtilities.serializeExpression(genericFuncDesc); + } + + /** + * Build the search argument string great than 100 + * + * @return searchArgumentStr + */ + private String buildSearchArgumentStr2() { + GenericUDF udf = new GenericUDFOPGreaterThan(); + List children = Lists.newArrayList(); + ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(100); + ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Integer.class, "intCol", "T", false); + children.add(columnDesc); + children.add(constantDesc); + ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + return SerializationUtilities.serializeExpression(genericFuncDesc); + } + private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) { List columnTypeList = createHiveTypeInfoFrom(columnTypes); List columnNameList = createHiveColumnsFrom(columnNames); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 670bfa6..9ea36e8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.VectorizedColumnReaderTestUtils; import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; @@ -104,7 +105,7 @@ public void testNullSplitForParquetReader() throws Exception { conf.set(IOConstants.COLUMNS_TYPES,"int"); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); - initialVectorizedRowBatchCtx(conf); + VectorizedColumnReaderTestUtils.initialVectorizedRowBatchCtx(conf); VectorizedParquetRecordReader reader = new VectorizedParquetRecordReader((InputSplit)null, new JobConf(conf)); assertFalse(reader.next(reader.createKey(), reader.createValue())); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index f537cee..2da94cc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -19,48 +19,32 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.serde.VectorizedColumnReaderTestUtils; import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; -import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; -import java.util.List; import static junit.framework.Assert.assertTrue; import static junit.framework.TestCase.assertFalse; import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; -import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.junit.Assert.assertEquals; @@ -213,18 +197,10 @@ protected static boolean isNull(int index) { return (index % NULL_FREQUENCY == 0); } - protected VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) - throws IOException, InterruptedException, HiveException { - conf.set(PARQUET_READ_SCHEMA, schemaString); - HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); - - Job vectorJob = new Job(conf, "read vector"); - ParquetInputFormat.setInputPaths(vectorJob, file); - ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); - InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); - initialVectorizedRowBatchCtx(conf); - return new VectorizedParquetRecordReader(split, new JobConf(conf)); + public static VectorizedParquetRecordReader createParquetReader( + String schemaString, + Configuration conf) throws IOException, InterruptedException, HiveException { + return VectorizedColumnReaderTestUtils.createParquetReader(schemaString, conf, file); } protected static void writeData(ParquetWriter writer, boolean isDictionaryEncoding) throws IOException { @@ -295,24 +271,7 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar writer.close(); } - protected void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { - MapWork mapWork = new MapWork(); - VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(createStructObjectInspector(conf), new String[0]); - mapWork.setVectorMode(true); - mapWork.setVectorizedRowBatchCtx(rbCtx); - Utilities.setMapWork(conf, mapWork); - } - private StructObjectInspector createStructObjectInspector(Configuration conf) { - // Create row related objects - String columnNames = conf.get(IOConstants.COLUMNS); - List columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); - String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); - List columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); - TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); - return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); - } protected void intRead(boolean isDictionaryEncoding) throws InterruptedException, HiveException, IOException { Configuration conf = new Configuration(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/VectorizedColumnReaderTestUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/VectorizedColumnReaderTestUtils.java new file mode 100644 index 0000000..5288a49 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/VectorizedColumnReaderTestUtils.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.serde; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.example.GroupReadSupport; + +import java.io.IOException; +import java.util.List; + +import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; + +public class VectorizedColumnReaderTestUtils { + public static VectorizedParquetRecordReader createParquetReader( + String schemaString, + Configuration conf, + Path file) throws IOException, InterruptedException, HiveException { + conf.set(PARQUET_READ_SCHEMA, schemaString); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); + + Job vectorJob = new Job(conf, "read vector"); + ParquetInputFormat.setInputPaths(vectorJob, file); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); + InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + initialVectorizedRowBatchCtx(conf); + return new VectorizedParquetRecordReader(split, new JobConf(conf)); + } + + public static void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { + MapWork mapWork = new MapWork(); + VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(createStructObjectInspector(conf), new String[0]); + mapWork.setVectorMode(true); + mapWork.setVectorizedRowBatchCtx(rbCtx); + Utilities.setMapWork(conf, mapWork); + } + + private static StructObjectInspector createStructObjectInspector(Configuration conf) { + // Create row related objects + String columnNames = conf.get(IOConstants.COLUMNS); + List columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); + List columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + } +}