diff --git common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java index f2c1493f56..0193aba0f7 100644 --- common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java +++ common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java @@ -21,6 +21,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -182,6 +183,10 @@ public static Timestamp ofEpochSecond(long epochSecond, int nanos) { LocalDateTime.ofEpochSecond(epochSecond, nanos, ZoneOffset.UTC)); } + public static Timestamp ofEpochSecond(long epochSecond, long nanos, ZoneId zone) { + return new Timestamp(LocalDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanos), zone)); + } + public static Timestamp ofEpochMilli(long epochMilli) { return new Timestamp(LocalDateTime .ofInstant(Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC)); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index d67b030648..490b71e0a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -48,6 +49,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -683,6 +685,21 @@ protected TimestampWritableV2 convert(Binary binary) { }; } }, + EINT64_TIMESTAMP_CONVERTER(TimestampWritableV2.class) { + @Override + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, + TypeInfo hiveTypeInfo) { + return new PrimitiveConverter() { + @Override + public void addLong(final long value) { + TimestampLogicalTypeAnnotation logicalType = (TimestampLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + Timestamp timestamp = + ParquetTimestampUtils.getTimestamp(value, logicalType.getUnit(), logicalType.isAdjustedToUTC()); + parent.set(index, new TimestampWritableV2(timestamp)); + } + }; + } + }, EDATE_CONVERTER(DateWritableV2.class) { @Override PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { @@ -730,6 +747,11 @@ public static PrimitiveConverter getNewConverter(final PrimitiveType type, final public Optional visit(DateLogicalTypeAnnotation logicalTypeAnnotation) { return Optional.of(EDATE_CONVERTER.getConverter(type, index, parent, hiveTypeInfo)); } + + @Override + public Optional visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) { + return Optional.of(EINT64_TIMESTAMP_CONVERTER.getConverter(type, index, parent, hiveTypeInfo)); + } }); if (converter.isPresent()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java new file mode 100644 index 0000000000..7626583fbb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.timestamp; + +import java.time.ZoneId; +import java.time.ZoneOffset; + +import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; + +public class ParquetTimestampUtils { + public static Timestamp getTimestamp(long value, TimeUnit timeUnit, boolean isAdjustedToUTC) { + + ZoneId zone = ZoneOffset.UTC; + if (isAdjustedToUTC) { + zone = ZoneId.systemDefault(); + } + long seconds = 0L; + long nanoseconds = 0L; + + switch (timeUnit) { + case MILLIS: + seconds = value / 1000; + nanoseconds = (value % 1000) * 1000000; + break; + + case MICROS: + seconds = value / 1000000; + nanoseconds = (value % 1000000) * 1000; + break; + + case NANOS: + seconds = value / 1000000000; + nanoseconds = (value % 1000000000); + break; + default: + break; + } + return Timestamp.ofEpochSecond(seconds, nanoseconds, zone); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java index 519bd813e9..10dfe22426 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; @@ -43,6 +44,8 @@ import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -427,6 +430,9 @@ long validatedLong(long longValue, String typeName) { */ public static class TypesFromInt64PageReader extends DefaultParquetDataColumnReader { + private boolean isAdjustedToUTC; + private TimeUnit timeUnit; + public TypesFromInt64PageReader(ValuesReader realReader, int length, int precision, int scale) { super(realReader, length, precision, scale); } @@ -435,6 +441,18 @@ public TypesFromInt64PageReader(Dictionary dict, int length, int precision, int super(dict, length, precision, scale); } + public TypesFromInt64PageReader(ValuesReader realReader, int length, boolean isAdjustedToUTC, TimeUnit timeUnit) { + super(realReader, length); + this.isAdjustedToUTC = isAdjustedToUTC; + this.timeUnit = timeUnit; + } + + public TypesFromInt64PageReader(Dictionary dict, int length, boolean isAdjustedToUTC, TimeUnit timeUnit) { + super(dict, length); + this.isAdjustedToUTC = isAdjustedToUTC; + this.timeUnit = timeUnit; + } + @Override public long readInteger() { return super.validatedLong(valuesReader.readLong(), serdeConstants.INT_TYPE_NAME); @@ -533,6 +551,21 @@ public double readDouble(int id) { return convertToBytes(value); } + private Timestamp convert(Long value) { + Timestamp timestamp = ParquetTimestampUtils.getTimestamp(value, timeUnit, isAdjustedToUTC); + return timestamp; + } + + @Override + public Timestamp readTimestamp(int id) { + return convert(dict.decodeToLong(id)); + } + + @Override + public Timestamp readTimestamp() { + return convert(valuesReader.readLong()); + } + private static String convertToString(long value) { return Long.toString(value); } @@ -1844,20 +1877,29 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i hiveScale); } case INT64: + LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation(); + if (logicalType instanceof TimestampLogicalTypeAnnotation) { + TimestampLogicalTypeAnnotation timestampLogicalType = (TimestampLogicalTypeAnnotation) logicalType; + boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC(); + TimeUnit timeUnit = timestampLogicalType.getUnit(); + return isDictionary ? new TypesFromInt64PageReader(dictionary, length, isAdjustedToUTC, timeUnit) + : new TypesFromInt64PageReader(valuesReader, length, isAdjustedToUTC, timeUnit); + } + if (ETypeConverter.isUnsignedInteger(parquetType)) { - return isDictionary ? new TypesFromUInt64PageReader(dictionary, length, hivePrecision, - hiveScale) : new TypesFromUInt64PageReader(valuesReader, length, hivePrecision, - hiveScale); - } else if (parquetType.getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation) { - DecimalLogicalTypeAnnotation logicalType = (DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation(); - final short scale = (short) logicalType.getScale(); + return isDictionary ? new TypesFromUInt64PageReader(dictionary, length, hivePrecision, hiveScale) + : new TypesFromUInt64PageReader(valuesReader, length, hivePrecision, hiveScale); + } + + if (logicalType instanceof DecimalLogicalTypeAnnotation) { + DecimalLogicalTypeAnnotation decimalLogicalType = (DecimalLogicalTypeAnnotation) logicalType; + final short scale = (short) decimalLogicalType.getScale(); return isDictionary ? new TypesFromInt64DecimalPageReader(dictionary, length, scale, hivePrecision, hiveScale) : new TypesFromInt64DecimalPageReader(valuesReader, length, scale, hivePrecision, hiveScale); - } else { - return isDictionary ? new TypesFromInt64PageReader(dictionary, length, hivePrecision, - hiveScale) : new TypesFromInt64PageReader(valuesReader, length, hivePrecision, - hiveScale); } + + return isDictionary ? new TypesFromInt64PageReader(dictionary, length, hivePrecision, hiveScale) + : new TypesFromInt64PageReader(valuesReader, length, hivePrecision, hiveScale); case FLOAT: return isDictionary ? new TypesFromFloatPageReader(dictionary, length, hivePrecision, hiveScale) : new TypesFromFloatPageReader(valuesReader, length, hivePrecision, hiveScale); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 2803baf90c..26ce5732c0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -448,6 +448,9 @@ private void readTimestamp(int total, TimestampColumnVector c, int rowId) throws case INT96: c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp()); break; + case INT64: + c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp()); + break; default: throw new IOException( "Unsupported parquet logical type: " + type.getLogicalTypeAnnotation().toString() + " for timestamp"); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java index f6ee57140c..4ff579d8bb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.time.ZoneId; + import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter.BinaryConverter; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; @@ -43,6 +45,8 @@ import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type.Repetition; @@ -114,6 +118,50 @@ public void testGetTimestampConverter() throws Exception { assertEquals(timestamp.getNanos(), timestampWritable.getNanos()); } + @Test + public void testGetInt64MillisTimestampConverter() throws Exception { + Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.112"); + PrimitiveType primitiveType = createInt64TimestampType(false, TimeUnit.MILLIS); + Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, timestamp.toEpochMilli()); + TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable; + assertEquals(timestamp.toEpochMilli(), timestampWritable.getTimestamp().toEpochMilli()); + } + + @Test + public void testGetInt64MicrosTimestampConverter() throws Exception { + Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.112233"); + PrimitiveType primitiveType = createInt64TimestampType(false, TimeUnit.MICROS); + long time = timestamp.toEpochSecond() * 1000000 + timestamp.getNanos() / 1000; + Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, time); + TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable; + assertEquals(timestamp.toEpochMilli(), timestampWritable.getTimestamp().toEpochMilli()); + assertEquals(timestamp.getNanos(), timestampWritable.getNanos()); + } + + @Test + public void testGetInt64NanosTimestampConverter() throws Exception { + Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.11223344"); + PrimitiveType primitiveType = createInt64TimestampType(false, TimeUnit.NANOS); + long time = timestamp.toEpochSecond() * 1000000000 + timestamp.getNanos(); + Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, time); + TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable; + assertEquals(timestamp.toEpochMilli(), timestampWritable.getTimestamp().toEpochMilli()); + assertEquals(timestamp.getNanos(), timestampWritable.getNanos()); + } + + @Test + public void testGetInt64NanosAdjustedToUTCTimestampConverter() throws Exception { + ZoneId zone = ZoneId.systemDefault(); + Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.11223344"); + PrimitiveType primitiveType = createInt64TimestampType(true, TimeUnit.NANOS); + long time = timestamp.toEpochSecond() * 1000000000 + timestamp.getNanos(); + Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, time); + TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable; + timestamp = Timestamp.ofEpochSecond(timestamp.toEpochSecond(), timestamp.getNanos(), zone); + assertEquals(timestamp.toEpochMilli(), timestampWritable.getTimestamp().toEpochMilli()); + assertEquals(timestamp.getNanos(), timestampWritable.getNanos()); + } + @Test public void testGetTextConverter() throws Exception { PrimitiveType primitiveType = Types.optional(PrimitiveTypeName.BINARY) @@ -292,9 +340,23 @@ private Writable getWritableFromPrimitiveConverter(TypeInfo hiveTypeInfo, Primit return converterParent.getValue(); } + private Writable getWritableFromPrimitiveConverter(TypeInfo hiveTypeInfo, PrimitiveType primitiveType, + Long valueToAdd) { + MyConverterParent converterParent = new MyConverterParent(); + PrimitiveConverter converter = ETypeConverter.getNewConverter(primitiveType, 1, converterParent, hiveTypeInfo); + ((PrimitiveConverter) converter).addLong(valueToAdd); + return converterParent.getValue(); + } + private PrimitiveTypeInfo createHiveTypeInfo(String typeName) { PrimitiveTypeInfo hiveTypeInfo = new PrimitiveTypeInfo(); hiveTypeInfo.setTypeName(typeName); return hiveTypeInfo; } + + private PrimitiveType createInt64TimestampType(boolean isAdjustedToUTC, TimeUnit unit) { + TimestampLogicalTypeAnnotation logicalType = TimestampLogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); + PrimitiveType primitiveType = Types.optional(PrimitiveTypeName.INT64).as(logicalType).named("value"); + return primitiveType; + } }