diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 167f9b6..453d6e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -24,7 +24,6 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; @@ -53,7 +52,6 @@ protected JobConf jobConf; protected int schemaSize; - protected List filtedBlocks; protected ParquetFileReader reader; /** @@ -106,34 +104,17 @@ protected ParquetInputSplit getSplit( return null; } - FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); - if (filter != null) { - filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); - if (filtedBlocks.isEmpty()) { - LOG.debug("All row groups are dropped due to filter predicates"); - return null; - } - - long droppedBlocks = splitGroup.size() - filtedBlocks.size(); - if (droppedBlocks > 0) { - LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); - } - } else { - filtedBlocks = splitGroup; - } + setFilter(jobConf, fileMetaData.getSchema()); if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); } split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - oldSplit.getLocations(), - filtedBlocks, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); + splitStart, + splitStart + splitLength, + splitLength, + oldSplit.getLocations(), + null); return split; } else { throw new IllegalArgumentException("Unknown split type: " + oldSplit); @@ -161,10 +142,6 @@ protected ParquetInputSplit getSplit( } } - public List getFiltedBlocks() { - return filtedBlocks; - } - public SerDeStats getStats() { return serDeStats; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index ac430a6..8d6696e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -88,6 +88,11 @@ public ParquetRecordReaderWrapper( HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, skipTimestampConversion); } + if (jobConf.get(ParquetInputFormat.FILTER_PREDICATE)!= null) { + conf.set(ParquetInputFormat.FILTER_PREDICATE, + jobConf.get(ParquetInputFormat.FILTER_PREDICATE)); + } + final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); if (split != null) { try { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index bf363f3..8e8de19 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -24,8 +24,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -41,6 +43,13 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; @@ -50,6 +59,9 @@ import com.google.common.collect.Lists; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.offsets; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; + public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { JobConf conf; @@ -104,11 +116,21 @@ public void write(RecordConsumer consumer) { String searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); - ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) - new MapredParquetInputFormat().getRecordReader( - new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + FileSplit testSplit = new FileSplit(testPath, 0, fileLength(testPath), (String[]) null); + + List levels = new ArrayList<>(); + levels.add(RowGroupFilter.FilterLevel.DICTIONARY); + levels.add(RowGroupFilter.FilterLevel.STATISTICS); + + ParquetFileReader fileReader = ParquetFileReader.open(conf, + testPath, + ParquetMetadataConverter.range(testSplit.getStart(), testSplit.getStart() + testSplit.getLength())); + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); + ParquetInputFormat.setFilterPredicate(conf, p); + List blocks = RowGroupFilter.filterRowGroups(levels, FilterCompat.get(p), fileReader.getFooter().getBlocks(), fileReader); - Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); + Assert.assertEquals("row group is not filtered correctly", 1, blocks.size()); // > 100 constantDesc = new ExprNodeConstantDesc(100); @@ -117,11 +139,15 @@ public void write(RecordConsumer consumer) { searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); - recordReader = (ParquetRecordReaderWrapper) - new MapredParquetInputFormat().getRecordReader( - new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + fileReader = ParquetFileReader.open(conf, + testPath, + ParquetMetadataConverter.range(testSplit.getStart(), testSplit.getStart() + testSplit.getLength())); + sarg = ConvertAstToSearchArg.createFromConf(conf); + p = ParquetFilterPredicateConverter.toFilterPredicate(sarg,fileSchema); + ParquetInputFormat.setFilterPredicate(conf, p); + blocks = RowGroupFilter.filterRowGroups(levels, FilterCompat.get(p), fileReader.getFooter().getBlocks(), fileReader); - Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); + Assert.assertEquals("row group is not filtered correctly", 0, blocks.size()); } private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) {