diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index a91bf90..1abeca8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -74,7 +74,7 @@ private boolean skipTimestampConversion = false; private JobConf jobConf; private final ProjectionPusher projectionPusher; - private List filtedBlocks; + private List filteredBlocks; public ParquetRecordReaderWrapper( final ParquetInputFormat newInputFormat, @@ -271,18 +271,18 @@ protected ParquetInputSplit getSplit( } if (filter != null) { - filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); - if (filtedBlocks.isEmpty()) { + filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filteredBlocks.isEmpty()) { LOG.debug("All row groups are dropped due to filter predicates"); return null; } - long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + long droppedBlocks = splitGroup.size() - filteredBlocks.size(); if (droppedBlocks > 0) { LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); } } else { - filtedBlocks = splitGroup; + filteredBlocks = splitGroup; } if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { @@ -292,7 +292,7 @@ protected ParquetInputSplit getSplit( splitStart, splitLength, ((FileSplit) oldSplit).getLocations(), - filtedBlocks, + filteredBlocks, readContext.getRequestedSchema().toString(), fileMetaData.getSchema().toString(), fileMetaData.getKeyValueMetaData(), @@ -303,7 +303,7 @@ protected ParquetInputSplit getSplit( } } - public List getFiltedBlocks() { - return filtedBlocks; + public List getFilteredBlocks() { + return filteredBlocks; } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java index d44385e..6735bb5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java @@ -36,6 +36,7 @@ public abstract class AbstractTestParquetDirect { public static FileSystem localFS = null; + private int blockSize = ParquetWriter.DEFAULT_BLOCK_SIZE; @BeforeClass public static void initializeFS() throws IOException { @@ -79,7 +80,11 @@ public void write(Void record) { } } - public Path writeDirect(String name, MessageType type, DirectWriter writer) + public Path writeDirect(String name, MessageType type, DirectWriter writer) throws IOException{ + return writeDirect(name, type, writer, 1); + } + + public Path writeDirect(String name, MessageType type, DirectWriter writer, int count) throws IOException { File temp = tempDir.newFile(name + ".parquet"); temp.deleteOnExit(); @@ -88,8 +93,11 @@ public Path writeDirect(String name, MessageType type, DirectWriter writer) Path path = new Path(temp.getPath()); ParquetWriter parquetWriter = new ParquetWriter(path, - new DirectWriteSupport(type, writer, new HashMap())); - parquetWriter.write(null); + new DirectWriteSupport(type, writer, new HashMap()), + ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME, blockSize, ParquetWriter.DEFAULT_PAGE_SIZE); + for (int i = 0; i < count; i++) { + parquetWriter.write(null); + } parquetWriter.close(); return path; @@ -188,4 +196,8 @@ protected ArrayWritableObjectInspector getObjectInspector(final String columnNam return columnTypes; } + + protected void setBlockSize(int size) { + blockSize = size; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetBlockElimination.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetBlockElimination.java new file mode 100644 index 0000000..e22025d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetBlockElimination.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import java.util.List; + +public class TestParquetBlockElimination extends AbstractTestParquetDirect { + JobConf conf; + String columnNames; + String columnTypes; + Path testPath; + StructObjectInspector inspector; + + @Before + public void initConf() throws Exception { + conf = new JobConf(); + + // define schema + columnNames = "intCol,doubleCol,booleanCol"; + columnTypes = "int,double,boolean"; + inspector = getObjectInspector(columnNames, columnTypes); + MessageType fileSchema = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional int32 intCol;\n" + + " optional double doubleCol;\n" + + " optional boolean booleanCol;\n" + + "}\n" + ); + + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "intCol,doubleCol"); + conf.set("columns", columnNames); + conf.set("columns.types", columnTypes); + + // create Parquet file with specific data. 3 blocks. + setBlockSize(1200); + testPath = writeDirect("BlockElimination", fileSchema, + new DirectWriter() { + Integer value = 1; + + @Override + public void write(RecordConsumer consumer) { + consumer.startMessage(); + addField(consumer, value, value.doubleValue(), true); + consumer.endMessage(); + value++; + } + }, 300); + } + + @Test + public void testBlockEliminationMultiple() throws Exception { + // intCol > 0 + GenericUDF udf = new GenericUDFOPGreaterThan(); + List children = Lists.newArrayList(); + ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Integer.class, "intCol", "T", false); + ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(0); + children.add(columnDesc); + children.add(constantDesc); + ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 3, recordReader.getFilteredBlocks() + .size()); + + // > 100 + constantDesc = new ExprNodeConstantDesc(100); + children.set(1, constantDesc); + genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 2, recordReader.getFilteredBlocks() + .size()); + + // > 200 + constantDesc = new ExprNodeConstantDesc(200); + children.set(1, constantDesc); + genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFilteredBlocks() + .size()); + + // > 300 + constantDesc = new ExprNodeConstantDesc(300); + children.set(1, constantDesc); + genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFilteredBlocks() + .size()); + } + + + @Test + public void testBlockEliminationComplexExpression() throws Exception { + // predicate: intCol > 100 and doubleCol > 200.0 + GenericUDF udf = new GenericUDFOPGreaterThan(); + List children = Lists.newArrayList(); + ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Integer.class, "intCol", "T", false); + ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(100); + children.add(columnDesc); + children.add(constantDesc); + ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + + GenericUDF udf1 = new GenericUDFOPGreaterThan(); + List children1 = Lists.newArrayList(); + ExprNodeColumnDesc columnDesc1 = new ExprNodeColumnDesc(Double.class, "doubleCol", "T", false); + ExprNodeConstantDesc constantDesc1 = new ExprNodeConstantDesc(200.0); + children1.add(columnDesc1); + children1.add(constantDesc1); + ExprNodeGenericFuncDesc genericFuncDesc1 = new ExprNodeGenericFuncDesc(inspector, udf1, + children1); + + GenericUDF udf2 = new GenericUDFOPAnd(); + List children2 = Lists.newArrayList(); + children2.add(genericFuncDesc); + children2.add(genericFuncDesc1); + ExprNodeGenericFuncDesc genericFuncDesc2 = new ExprNodeGenericFuncDesc(inspector, udf2, + children2); + + String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc2); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFilteredBlocks() + .size()); + } + + private void addField(RecordConsumer consumer, int a, double b, boolean c) { + consumer.startField("intCol", 0); + consumer.addInteger(a); + consumer.endField("intCol", 0); + consumer.startField("doubleCol", 1); + consumer.addDouble(b); + consumer.endField("doubleCol", 1); + consumer.startField("booleanCol", 2); + consumer.addBoolean(c); + consumer.endField("booleanCol", 2); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index 1b98c00..3ff9461 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -96,7 +96,7 @@ public void write(RecordConsumer consumer) { new MapredParquetInputFormat().getRecordReader( new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); - Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFilteredBlocks().size()); // > 100 constantDesc = new ExprNodeConstantDesc(100); @@ -109,6 +109,6 @@ public void write(RecordConsumer consumer) { new MapredParquetInputFormat().getRecordReader( new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); - Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); + Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFilteredBlocks().size()); } }