diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java index 83865e8..fb41768 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java @@ -159,6 +159,7 @@ public FilterPredicateLeafBuilder getLeafFilterBuilderByType(PredicateLeaf.Type case BOOLEAN: return new BooleanFilterPredicateLeafBuilder(); case DATE: + return new IntFilterPredicateLeafBuilder(); case DECIMAL: case TIMESTAMP: default: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java index a451bfb..6aa4f34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import java.math.BigDecimal; +import java.sql.Date; import java.sql.Timestamp; import java.util.ArrayDeque; import java.util.ArrayList; @@ -132,6 +133,12 @@ public Object getLiteral(FileFormat format) { } return literal; case PARQUET: + if (literal instanceof ExprNodeConstantDesc) { + Object constant = ((ExprNodeConstantDesc) literal).getValue(); + if (constant instanceof Date) { + return ((Date) constant).getTime(); + } + } return literal; default: throw new RuntimeException( diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java index 3a47673..d44385e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java @@ -12,8 +12,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; @@ -151,4 +156,36 @@ public void deserialize(Writable record, List columnNames, serde.initialize(null, props); serde.deserialize(record); } + + protected ArrayWritableObjectInspector getObjectInspector(final String columnNames, + final String columnTypes) { + List columnTypeList = createHiveTypeInfoFrom(columnTypes); + List columnNameList = createHiveColumnsFrom(columnNames); + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + + return new ArrayWritableObjectInspector(rowTypeInfo); + } + + private List createHiveColumnsFrom(final String columnNamesStr) { + List columnNames; + if (columnNamesStr.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNamesStr.split(",")); + } + + return columnNames; + } + + private List createHiveTypeInfoFrom(final String columnsTypeStr) { + List columnTypes; + + if (columnsTypeStr.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); + } + + return columnTypes; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetPPD.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetPPD.java new file mode 100644 index 0000000..4f1b8d8 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetPPD.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import java.sql.Date; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestParquetPPD extends AbstractTestParquetDirect { + JobConf conf; + String columnNames; + String columnTypes; + + @Before + public void initConf() throws Exception { + conf = new JobConf(); + + } + + @Test + public void testParquetPPDDate() throws Exception { + // define schema + columnNames = "dateCol"; + columnTypes = "date"; + StructObjectInspector inspector = getObjectInspector(columnNames, columnTypes); + MessageType fileSchema = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional int32 dateCol (DATE);\n" + + "}\n" + ); + + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "dateCol"); + conf.set("columns", columnNames); + conf.set("columns.types", columnTypes); + + // create Parquet file with specific data + Path testPath = writeDirect("RowGroupFilterTakeEffect", fileSchema, + new DirectWriter() { + @Override + public void write(RecordConsumer consumer) { + int date = 0; + for(int i = 0; i < 100; i++) { + consumer.startMessage(); + consumer.startField("int", 0); + consumer.addInteger(date); + consumer.endField("int", 0); + consumer.endMessage(); + date += 10000; + } + } + }); + + // > 500000 + GenericUDF udf = new GenericUDFOPGreaterThan(); + List children = Lists.newArrayList(); + ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Date.class, "dateCol", "T", false); + ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(new Date(500000)); + children.add(columnDesc); + children.add(constantDesc); + ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); + + // > 1000000 + constantDesc = new ExprNodeConstantDesc(new Date(1000000)); + children.set(1, constantDesc); + genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); + } + + //TODO: add tests to cover more data types + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index 4ccb207..1b98c00 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -22,16 +22,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.junit.Assert; @@ -41,8 +36,6 @@ import parquet.schema.MessageType; import parquet.schema.MessageTypeParser; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { @@ -118,35 +111,4 @@ public void write(RecordConsumer consumer) { Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); } - - private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) { - List columnTypeList = createHiveTypeInfoFrom(columnTypes); - List columnNameList = createHiveColumnsFrom(columnNames); - StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); - - return new ArrayWritableObjectInspector(rowTypeInfo); - } - - private List createHiveColumnsFrom(final String columnNamesStr) { - List columnNames; - if (columnNamesStr.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNamesStr.split(",")); - } - - return columnNames; - } - - private List createHiveTypeInfoFrom(final String columnsTypeStr) { - List columnTypes; - - if (columnsTypeStr.length() == 0) { - columnTypes = new ArrayList(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); - } - - return columnTypes; - } }