diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java index fb41768..e9ca077 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/LeafFilterFactory.java @@ -162,6 +162,7 @@ public FilterPredicateLeafBuilder getLeafFilterBuilderByType(PredicateLeaf.Type return new IntFilterPredicateLeafBuilder(); case DECIMAL: case TIMESTAMP: + return new LongFilterPredicateLeafBuilder(); default: LOG.debug("Conversion to Parquet FilterPredicate not supported for " + type); return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java index 6aa4f34..449ef5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java @@ -137,6 +137,8 @@ public Object getLiteral(FileFormat format) { Object constant = ((ExprNodeConstantDesc) literal).getValue(); if (constant instanceof Date) { return ((Date) constant).getTime(); + } else if (constant instanceof Timestamp) { + return ((Timestamp) constant).getTime(); } } return literal; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetPPD.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetPPD.java index 4f1b8d8..083ec60 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetPPD.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetPPD.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; @@ -42,6 +43,7 @@ import parquet.schema.MessageTypeParser; import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -121,6 +123,91 @@ public void write(RecordConsumer consumer) { Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); } + + @Test + public void testParquetPPDTimestamp() throws Exception { + // define schema + columnNames = "tsCol"; + columnTypes = "timestamp"; + StructObjectInspector inspector = getObjectInspector(columnNames, columnTypes); + MessageType fileSchema = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional int64 tsCol (TIMESTAMP_MILLIS);\n" + + "}\n" + ); + + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "tsCol"); + conf.set("columns", columnNames); + conf.set("columns.types", columnTypes); + + // create Parquet file with specific data + final String[] ts = new String[] {"2011-01-01 01:01:01.111111111", + "2012-02-02 02:02:02.222222222", + "2013-03-03 03:03:03.333333333", + "2014-04-04 04:04:04.444444444", + "2015-05-05 05:05:05.555555555", + "2016-06-06 06:06:06.666666666", + "2017-07-07 07:07:07.777777777", + "2018-08-08 08:08:08.888888888", + "2019-09-09 09:09:09.999999999", + "2020-10-10 10:10:10.101010101", + "2021-11-11 11:11:11.111111111", + "2022-12-12 12:12:12.121212121", + "2023-01-02 13:13:13.131313131", + "2024-02-02 14:14:14.141414141", + "2025-03-03 15:15:15.151515151", + "2026-04-04 16:16:16.161616161", + "2027-05-05 17:17:17.171717171", + "2028-06-06 18:18:18.181818181", + "2029-07-07 19:19:19.191919191", + "2030-08-08 20:20:20.202020202"}; + + Path testPath = writeDirect("RowGroupFilterTakeEffect", fileSchema, + new DirectWriter() { + @Override + public void write(RecordConsumer consumer) { + int date = 0; + for(int i = 0; i < 20; i++) { + consumer.startMessage(); + consumer.startField("int64", 0); + consumer.addLong(Timestamp.valueOf(ts[i]).getTime()); + consumer.endField("int64", 0); + consumer.endMessage(); + } + } + }); + + // not filtered + GenericUDF udf = new GenericUDFOPGreaterThan(); + List children = Lists.newArrayList(); + ExprNodeColumnDesc columnDesc = new ExprNodeColumnDesc(Timestamp.class, "tsCol", "T", false); + ExprNodeConstantDesc constantDesc = new ExprNodeConstantDesc(Timestamp.valueOf("2027-05-05 17:17:17.171717171")); + children.add(columnDesc); + children.add(constantDesc); + ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); + + // filtered + constantDesc = new ExprNodeConstantDesc(Timestamp.valueOf("2031-09-09 21:21:21.212121212")); + children.set(1, constantDesc); + genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); + searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); + + recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); + } + //TODO: add tests to cover more data types }