diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index fe4e366..0858fbd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -43,6 +43,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import parquet.filter2.compat.FilterCompat; +import parquet.filter2.compat.RowGroupFilter; import parquet.filter2.predicate.FilterPredicate; import parquet.hadoop.ParquetFileReader; import parquet.hadoop.ParquetInputFormat; @@ -72,6 +74,7 @@ private boolean skipTimestampConversion = false; private JobConf jobConf; private final ProjectionPusher projectionPusher; + private List filtedBlocks; public ParquetRecordReaderWrapper( final ParquetInputFormat newInputFormat, @@ -100,8 +103,6 @@ public ParquetRecordReaderWrapper( taskAttemptID = new TaskAttemptID(); } - setFilter(jobConf); - // create a TaskInputOutputContext Configuration conf = jobConf; if (skipTimestampConversion ^ HiveConf.getBoolVar( @@ -136,13 +137,13 @@ public ParquetRecordReaderWrapper( } } - public void setFilter(final JobConf conf) { + public FilterCompat.Filter setFilter(final JobConf conf) { String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty() || columnNamesString.isEmpty()) { - return; + return null; } FilterPredicate p = @@ -151,9 +152,11 @@ public void setFilter(final JobConf conf) { if (p != null) { LOG.debug("Predicate filter for parquet is " + p.toString()); ParquetInputFormat.setFilterPredicate(conf, p); + return FilterCompat.get(p); } else { LOG.debug("No predicate filter can be generated for " + TableScanDesc.FILTER_EXPR_CONF_STR + " with the value of " + serializedPushdown); + return null; } } @@ -244,6 +247,7 @@ protected ParquetInputSplit getSplit( if (oldSplit instanceof FileSplit) { final Path finalPath = ((FileSplit) oldSplit).getPath(); jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + FilterCompat.Filter filter = setFilter(jobConf); final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); final List blocks = parquetMetadata.getBlocks(); @@ -264,24 +268,43 @@ protected ParquetInputSplit getSplit( } if (splitGroup.isEmpty()) { LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); - split = null; - } else { - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + return null; + } + + if (filter != null) { + filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filtedBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); + return null; + } + + long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); } - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - ((FileSplit) oldSplit).getLocations(), - splitGroup, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); + } else { + filtedBlocks = splitGroup; + } + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); } + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + ((FileSplit) oldSplit).getLocations(), + filtedBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; } else { throw new IllegalArgumentException("Unknown split type: " + oldSplit); } - return split; + } + + public List getFiltedBlocks() { + return filtedBlocks; } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java new file mode 100644 index 0000000..4ccb207 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { + + JobConf conf; + String columnNames; + String columnTypes; + + @Before + public void initConf() throws Exception { + conf = new JobConf(); + + } + + @Test + public void testRowGroupFilterTakeEffect() throws Exception { + // define schema + columnNames = "intCol"; + columnTypes = "int"; + StructObjectInspector inspector = getObjectInspector(columnNames, columnTypes); + MessageType fileSchema = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional int32 intCol;\n" + + "}\n" + ); + + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "intCol"); + conf.set("columns", "intCol"); + conf.set("columns.types", "int"); + + // create Parquet file with specific data + Path testPath = writeDirect("RowGroupFilterTakeEffect", fileSchema, + new DirectWriter() { + @Override + public void write(RecordConsumer consumer) { + for(int i = 0; i < 100; i++) { + consumer.startMessage(); + consumer.startField("int", 0); + consumer.addInteger(i); + consumer.endField("int", 0); + consumer.endMessage(); + } + } + }); + + // > 50 + GenericUDF udf = new GenericUDFOPGreaterThan(); + List children = Lists.newArrayList(); + ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Integer.class, "intCol", "T", false); + ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(50); + children.add(columnDesc); + children.add(constantDesc); + ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); + + // > 100 + constantDesc = new ExprNodeConstantDesc(100); + children.set(1, constantDesc); + genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); + } + + private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) { + List columnTypeList = createHiveTypeInfoFrom(columnTypes); + List columnNameList = createHiveColumnsFrom(columnNames); + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + + return new ArrayWritableObjectInspector(rowTypeInfo); + } + + private List createHiveColumnsFrom(final String columnNamesStr) { + List columnNames; + if (columnNamesStr.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNamesStr.split(",")); + } + + return columnNames; + } + + private List createHiveTypeInfoFrom(final String columnsTypeStr) { + List columnTypes; + + if (columnsTypeStr.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); + } + + return columnTypes; + } +}