diff --git common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java index a8b7b6d186..97a3ce1e6d 100644 --- common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java +++ common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java @@ -19,6 +19,7 @@ import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -135,6 +136,13 @@ public long toEpochMilli() { return localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); } + /** + * @return milliseconds since epoch at time zone of zoneId + */ + public long toEpochMilli(ZoneId zoneId) { + return localDateTime.toInstant(zoneId.getRules().getOffset(localDateTime)).toEpochMilli(); + } + public void setTimeInMillis(long epochMilli) { localDateTime = LocalDateTime.ofInstant( Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC); @@ -181,8 +189,11 @@ public static Timestamp ofEpochMilli(long epochMilli) { } public static Timestamp ofEpochMilli(long epochMilli, int nanos) { - return new Timestamp(LocalDateTime - .ofInstant(Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC) + return ofEpochMilli(epochMilli, nanos, ZoneOffset.UTC); + } + + public static Timestamp ofEpochMilli(long epochMilli, int nanos, ZoneId zoneId) { + return new Timestamp(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMilli), zoneId) .withNano(nanos)); } diff --git data/files/parquet_historical_timestamp_legacy.parq data/files/parquet_historical_timestamp_legacy.parq new file mode 100644 index 0000000000..3dd855140b Binary files /dev/null and data/files/parquet_historical_timestamp_legacy.parq differ diff --git data/files/parquet_historical_timestamp_new.parq data/files/parquet_historical_timestamp_new.parq new file mode 100644 index 0000000000..aab0707aa2 Binary files /dev/null and data/files/parquet_historical_timestamp_new.parq differ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index 9010ac36cd..e7ecb8d4a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -603,7 +604,8 @@ protected TimestampWritableV2 convert(Binary binary) { //If this file written by current Hive implementation itself, we need to do the reverse conversion, else skip the conversion. boolean skipConversion = Boolean.parseBoolean( metadata.get(HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname)); - Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion); + Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion, + DataWritableReadSupport.getWriterTimeZoneId(metadata)); return new TimestampWritableV2(ts); } }; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 7f2a684d28..fa1037a134 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -13,6 +13,8 @@ */ package org.apache.hadoop.hive.ql.io.parquet.read; +import java.time.DateTimeException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -26,6 +28,7 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.FieldNode; import org.apache.hadoop.hive.ql.optimizer.NestedColumnFieldPruningUtils; @@ -262,6 +265,24 @@ public static MessageType getProjectedSchema( return new MessageType(schema.getName(), schemaTypes); } + /** + * Get a valid zoneId from some metadata, otherwise return null + */ + public static ZoneId getWriterTimeZoneId(Map metadata) { + if (metadata == null) { + return null; + } + String value = metadata.get(DataWritableWriteSupport.WRITER_TIMEZONE); + try { + if (value != null) { + return ZoneId.of(value); + } + } catch (DateTimeException e) { + //can't parse ZoneId + } + return null; + } + /** * Return the columns which contains required nested attribute level * E.g., given struct a: while 'x' is required and 'y' is not, the method will return @@ -448,11 +469,23 @@ private static MessageType getRequestedPrunedSchema( throw new IllegalStateException("ReadContext not initialized properly. " + "Don't know the Hive Schema."); } + String key = HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname; if (!metadata.containsKey(key)) { metadata.put(key, String.valueOf(HiveConf.getBoolVar( configuration, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION))); } + + String writerTimezone = DataWritableWriteSupport.WRITER_TIMEZONE; + if (!metadata.containsKey(writerTimezone)) { + if (keyValueMetaData.containsKey(writerTimezone)) { + metadata.put(writerTimezone, keyValueMetaData.get(writerTimezone)); + } + } else if (!metadata.get(writerTimezone).equals(keyValueMetaData.get(writerTimezone))) { + throw new IllegalStateException("Metadata contains a writer time zone that does not match " + + "file footer's writer time zone."); + } + return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java index bf78d8cc5b..0e1fd7d7a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java @@ -13,6 +13,8 @@ */ package org.apache.hadoop.hive.ql.io.parquet.timestamp; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.TimeZone; @@ -33,33 +35,29 @@ static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1); private static final ThreadLocal parquetGMTCalendar = new ThreadLocal(); - private static final ThreadLocal parquetLocalCalendar = new ThreadLocal(); private static Calendar getGMTCalendar() { //Calendar.getInstance calculates the current-time needlessly, so cache an instance. if (parquetGMTCalendar.get() == null) { parquetGMTCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))); } + parquetGMTCalendar.get().clear(); return parquetGMTCalendar.get(); } - private static Calendar getLocalCalendar() { - if (parquetLocalCalendar.get() == null) { - parquetLocalCalendar.set(Calendar.getInstance()); - } - return parquetLocalCalendar.get(); - } - - public static Calendar getCalendar(boolean skipConversion) { - Calendar calendar = skipConversion ? getLocalCalendar() : getGMTCalendar(); - calendar.clear(); // Reset all fields before reusing this instance - return calendar; + public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { + return getNanoTime(ts, skipConversion, null); } - public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { + public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId timeZoneId) { + if (skipConversion) { + timeZoneId = ZoneOffset.UTC; + } else if (timeZoneId == null) { + timeZoneId = TimeZone.getDefault().toZoneId(); + } - Calendar calendar = getCalendar(skipConversion); - calendar.setTimeInMillis(ts.toEpochMilli()); + Calendar calendar = getGMTCalendar(); + calendar.setTimeInMillis(ts.toEpochMilli(timeZoneId)); int year = calendar.get(Calendar.YEAR); if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) { year = 1 - year; @@ -79,7 +77,17 @@ public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { return new NanoTime(days, nanosOfDay); } - public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { + public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { + return getTimestamp(nt, skipConversion, null); + } + + public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId timeZoneId) { + if (skipConversion) { + timeZoneId = ZoneOffset.UTC; + } else if (timeZoneId == null) { + timeZoneId = TimeZone.getDefault().toZoneId(); + } + int julianDay = nt.getJulianDay(); long nanosOfDay = nt.getTimeOfDayNanos(); @@ -92,7 +100,7 @@ public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { } JDateTime jDateTime = new JDateTime((double) julianDay); - Calendar calendar = getCalendar(skipConversion); + Calendar calendar = getGMTCalendar(); calendar.set(Calendar.YEAR, jDateTime.getYear()); calendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); //java calendar index starting at 1. calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay()); @@ -107,7 +115,7 @@ public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { calendar.set(Calendar.HOUR_OF_DAY, hour); calendar.set(Calendar.MINUTE, minutes); calendar.set(Calendar.SECOND, seconds); - Timestamp ts = Timestamp.ofEpochMilli(calendar.getTimeInMillis(), (int) nanos); + Timestamp ts = Timestamp.ofEpochMilli(calendar.getTimeInMillis(), (int) nanos, timeZoneId); return ts; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java index 9ce1ba4591..e8fcb6b214 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java @@ -39,6 +39,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.time.ZoneId; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; @@ -53,6 +54,7 @@ private static final Logger LOG = LoggerFactory.getLogger(BaseVectorizedColumnReader.class); protected boolean skipTimestampConversion = false; + protected ZoneId writerTimezone = null; /** * Total number of values read. @@ -116,12 +118,14 @@ public BaseVectorizedColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, + ZoneId writerTimezone, Type parquetType, TypeInfo hiveType) throws IOException { this.descriptor = descriptor; this.type = parquetType; this.pageReader = pageReader; this.maxDefLevel = descriptor.getMaxDefinitionLevel(); this.skipTimestampConversion = skipTimestampConversion; + this.writerTimezone = writerTimezone; this.hiveType = hiveType; DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); @@ -130,7 +134,7 @@ public BaseVectorizedColumnReader( this.dictionary = ParquetDataColumnReaderFactory .getDataColumnReaderByTypeOnDictionary(parquetType.asPrimitiveType(), hiveType, dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage), - skipTimestampConversion); + skipTimestampConversion, writerTimezone); this.isCurrentPageDictionaryEncoded = true; } catch (IOException e) { throw new IOException("could not decode the dictionary for " + descriptor, e); @@ -182,11 +186,11 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int } dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType, dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary - .getDictionary()), skipTimestampConversion); + .getDictionary()), skipTimestampConversion, writerTimezone); this.isCurrentPageDictionaryEncoded = true; } else { dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType, - dataEncoding.getValuesReader(descriptor, VALUES), skipTimestampConversion); + dataEncoding.getValuesReader(descriptor, VALUES), skipTimestampConversion, writerTimezone); this.isCurrentPageDictionaryEncoded = false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java index 7372275bc2..320ce526f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java @@ -44,6 +44,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; +import java.time.ZoneId; import java.util.Arrays; /** @@ -1172,16 +1173,20 @@ private static String convertToString(boolean value) { */ public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader { private boolean skipTimestampConversion = false; + private ZoneId writerTimezone; public TypesFromInt96PageReader(ValuesReader realReader, int length, - boolean skipTimestampConversion) { + boolean skipTimestampConversion, ZoneId writerTimezone) { super(realReader, length); this.skipTimestampConversion = skipTimestampConversion; + this.writerTimezone = writerTimezone; } - public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion) { + public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion, + ZoneId writerTimezone) { super(dict, length); this.skipTimestampConversion = skipTimestampConversion; + this.writerTimezone = writerTimezone; } private Timestamp convert(Binary binary) { @@ -1190,7 +1195,7 @@ private Timestamp convert(Binary binary) { long timeOfDayNanos = buf.getLong(); int julianDay = buf.getInt(); NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); - return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion, writerTimezone); } @Override @@ -1477,7 +1482,8 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i Dictionary dictionary, ValuesReader valuesReader, boolean - skipTimestampConversion) + skipTimestampConversion, + ZoneId writerTimezone) throws IOException { // max length for varchar and char cases int length = getVarcharLength(hiveType); @@ -1523,8 +1529,8 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i hiveScale) : new TypesFromFloatPageReader(valuesReader, length, hivePrecision, hiveScale); case INT96: return isDictionary ? new TypesFromInt96PageReader(dictionary, length, - skipTimestampConversion) : new - TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion); + skipTimestampConversion, writerTimezone) : new + TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion, writerTimezone); case BOOLEAN: return isDictionary ? new TypesFromBooleanPageReader(dictionary, length) : new TypesFromBooleanPageReader(valuesReader, length); @@ -1584,19 +1590,20 @@ private static ParquetDataColumnReader getConvertorFromBinary(boolean isDict, public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary( PrimitiveType parquetType, TypeInfo hiveType, - Dictionary realReader, boolean skipTimestampConversion) + Dictionary realReader, + boolean skipTimestampConversion, + ZoneId writerTimezone) throws IOException { return getDataColumnReaderByTypeHelper(true, parquetType, hiveType, realReader, null, - skipTimestampConversion); + skipTimestampConversion, writerTimezone); } public static ParquetDataColumnReader getDataColumnReaderByType(PrimitiveType parquetType, - TypeInfo hiveType, - ValuesReader realReader, - boolean skipTimestampConversion) + TypeInfo hiveType, ValuesReader realReader, boolean skipTimestampConversion, + ZoneId writerTimezone) throws IOException { return getDataColumnReaderByTypeHelper(false, parquetType, hiveType, null, realReader, - skipTimestampConversion); + skipTimestampConversion, writerTimezone); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java index 7e52b076b7..5c1ce70075 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java @@ -28,6 +28,7 @@ import org.apache.parquet.column.page.PageReader; import org.apache.parquet.schema.Type; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -47,9 +48,9 @@ boolean isFirstRow = true; public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader pageReader, - boolean skipTimestampConversion, Type type, TypeInfo hiveType) + boolean skipTimestampConversion, ZoneId writerTimezone, Type type, TypeInfo hiveType) throws IOException { - super(descriptor, pageReader, skipTimestampConversion, type, hiveType); + super(descriptor, pageReader, skipTimestampConversion, writerTimezone, type, hiveType); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index fd776cf978..61e2556b08 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -73,6 +73,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -125,6 +126,7 @@ * rows of all the row groups. */ protected long totalRowCount = 0; + private ZoneId writerTimezone; public VectorizedParquetRecordReader( org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) { @@ -250,6 +252,8 @@ public void initialize( this.totalRowCount += block.getRowCount(); } this.fileSchema = footer.getFileMetaData().getSchema(); + this.writerTimezone = DataWritableReadSupport + .getWriterTimeZoneId(footer.getFileMetaData().getKeyValueMetaData()); colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration); requestedSchema = DataWritableReadSupport @@ -440,13 +444,13 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < types.size(); ++i) { columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(colsToInclude.get(i)), types.get(i), - pages, requestedSchema.getColumns(), skipTimestampConversion, 0); + pages, requestedSchema.getColumns(), skipTimestampConversion, writerTimezone, 0); } } } else { for (int i = 0; i < types.size(); ++i) { columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages, - requestedSchema.getColumns(), skipTimestampConversion, 0); + requestedSchema.getColumns(), skipTimestampConversion, writerTimezone, 0); } } @@ -489,6 +493,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( PageReadStore pages, List columnDescriptors, boolean skipTimestampConversion, + ZoneId writerTimezone, int depth) throws IOException { List descriptors = getAllColumnDescriptorByType(depth, type, columnDescriptors); @@ -500,7 +505,8 @@ private VectorizedColumnReader buildVectorizedParquetReader( } if (fileSchema.getColumns().contains(descriptors.get(0))) { return new VectorizedPrimitiveColumnReader(descriptors.get(0), - pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type, typeInfo); + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, writerTimezone, type, + typeInfo); } else { // Support for schema evolution return new VectorizedDummyColumnReader(); @@ -513,7 +519,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( for (int i = 0; i < fieldTypes.size(); i++) { VectorizedColumnReader r = buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors, - skipTimestampConversion, depth + 1); + skipTimestampConversion, writerTimezone, depth + 1); if (r != null) { fieldReaders.add(r); } else { @@ -531,7 +537,8 @@ private VectorizedColumnReader buildVectorizedParquetReader( } return new VectorizedListColumnReader(descriptors.get(0), - pages.getPageReader(descriptors.get(0)), skipTimestampConversion, getElementType(type), + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, writerTimezone, + getElementType(type), typeInfo); case MAP: if (columnDescriptors == null || columnDescriptors.isEmpty()) { @@ -564,10 +571,10 @@ private VectorizedColumnReader buildVectorizedParquetReader( List kvTypes = groupType.getFields(); VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader( descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, - kvTypes.get(0), typeInfo); + writerTimezone, kvTypes.get(0), typeInfo); VectorizedListColumnReader valueListColumnReader = new VectorizedListColumnReader( descriptors.get(1), pages.getPageReader(descriptors.get(1)), skipTimestampConversion, - kvTypes.get(1), typeInfo); + writerTimezone, kvTypes.get(1), typeInfo); return new VectorizedMapColumnReader(keyListColumnReader, valueListColumnReader); case UNION: default: diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 003fbd9fae..1a861f348b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -31,6 +31,7 @@ import org.apache.parquet.schema.Type; import java.io.IOException; +import java.time.ZoneId; /** * It's column level Parquet reader which is used to read a batch of records for a column, @@ -47,8 +48,11 @@ public VectorizedPrimitiveColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, - Type type, TypeInfo hiveType) throws IOException { - super(descriptor, pageReader, skipTimestampConversion, type, hiveType); + ZoneId writerTimezone, + Type type, + TypeInfo hiveType) + throws IOException { + super(descriptor, pageReader, skipTimestampConversion, writerTimezone, type, hiveType); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java index 896094473b..9a3cc6daa3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.ql.io.parquet.write; import java.util.HashMap; +import java.util.TimeZone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; @@ -32,6 +33,7 @@ public class DataWritableWriteSupport extends WriteSupport { public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema"; + public static final String WRITER_TIMEZONE = "writer.time.zone"; private DataWritableWriter writer; private MessageType schema; @@ -47,7 +49,9 @@ public static MessageType getSchema(final Configuration configuration) { @Override public WriteContext init(final Configuration configuration) { schema = getSchema(configuration); - return new WriteContext(schema, new HashMap()); + HashMap metaData = new HashMap<>(); + metaData.put(WRITER_TIMEZONE, TimeZone.getDefault().toZoneId().toString()); + return new WriteContext(schema, metaData); } @Override diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java index 477825e3f4..6c617dcc28 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.ql.io.parquet.serde; +import java.time.ZoneId; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.TimeZone; @@ -32,6 +33,9 @@ */ public class TestParquetTimestampUtils extends TestCase { + public static final ZoneId GMT = TimeZone.getTimeZone("GMT").toZoneId(); + public static final ZoneId US_PACIFIC = TimeZone.getTimeZone("US/Pacific").toZoneId(); + public void testJulianDay() { //check if May 23, 1968 is Julian Day 2440000 Calendar cal = Calendar.getInstance(); @@ -119,7 +123,7 @@ public void testNanos() { Timestamp ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 1); //(1*60*60 + 1*60 + 1) * 10e9 + 1 - NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); + NanoTime nt = NanoTimeUtils.getNanoTime(ts, false, GMT); Assert.assertEquals(nt.getTimeOfDayNanos(), 3661000000001L); //case 2: 23:59:59.999999999 @@ -134,7 +138,7 @@ public void testNanos() { ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 999999999); //(23*60*60 + 59*60 + 59)*10e9 + 999999999 - nt = NanoTimeUtils.getNanoTime(ts, false); + nt = NanoTimeUtils.getNanoTime(ts, false, GMT); Assert.assertEquals(nt.getTimeOfDayNanos(), 86399999999999L); //case 3: verify the difference. @@ -158,15 +162,15 @@ public void testNanos() { cal1.setTimeZone(TimeZone.getTimeZone("GMT")); Timestamp ts1 = Timestamp.ofEpochMilli(cal1.getTimeInMillis(), 1); - NanoTime n2 = NanoTimeUtils.getNanoTime(ts2, false); - NanoTime n1 = NanoTimeUtils.getNanoTime(ts1, false); + NanoTime n2 = NanoTimeUtils.getNanoTime(ts2, false, GMT); + NanoTime n1 = NanoTimeUtils.getNanoTime(ts1, false, GMT); Assert.assertEquals(n2.getTimeOfDayNanos() - n1.getTimeOfDayNanos(), 600000000009L); NanoTime n3 = new NanoTime(n1.getJulianDay() - 1, n1.getTimeOfDayNanos() + TimeUnit.DAYS.toNanos(1)); - Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); + Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false, GMT)); n3 = new NanoTime(n1.getJulianDay() + 3, n1.getTimeOfDayNanos() - TimeUnit.DAYS.toNanos(3)); - Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); + Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false, GMT)); } public void testTimezone() { @@ -178,7 +182,7 @@ public void testTimezone() { cal.set(Calendar.MINUTE, 1); cal.set(Calendar.SECOND, 1); cal.setTimeZone(TimeZone.getTimeZone("US/Pacific")); - Timestamp ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 1); + Timestamp ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 1, US_PACIFIC); /** * 17:00 PDT = 00:00 GMT (daylight-savings) @@ -187,7 +191,7 @@ public void testTimezone() { * 17:00 PST = 01:00 GMT (if not daylight savings) * (1*60*60 + 1*60 + 1)*10e9 + 1 = 3661000000001 */ - NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); + NanoTime nt = NanoTimeUtils.getNanoTime(ts, false, US_PACIFIC); long timeOfDayNanos = nt.getTimeOfDayNanos(); Assert.assertTrue(timeOfDayNanos == 61000000001L || timeOfDayNanos == 3661000000001L); @@ -206,15 +210,15 @@ public void testTimezonelessValues() { public void testTimezoneless() { Timestamp ts1 = Timestamp.valueOf("2011-01-01 00:30:30.111111111"); NanoTime nt1 = NanoTimeUtils.getNanoTime(ts1, true); - Assert.assertEquals(nt1.getJulianDay(), 2455562); - Assert.assertEquals(nt1.getTimeOfDayNanos(), 59430111111111L); + Assert.assertEquals(nt1.getJulianDay(), 2455563); + Assert.assertEquals(nt1.getTimeOfDayNanos(), 1830111111111L); Timestamp ts1Fetched = NanoTimeUtils.getTimestamp(nt1, true); Assert.assertEquals(ts1Fetched.toString(), ts1.toString()); Timestamp ts2 = Timestamp.valueOf("2011-02-02 08:30:30.222222222"); NanoTime nt2 = NanoTimeUtils.getNanoTime(ts2, true); Assert.assertEquals(nt2.getJulianDay(), 2455595); - Assert.assertEquals(nt2.getTimeOfDayNanos(), 1830222222222L); + Assert.assertEquals(nt2.getTimeOfDayNanos(), 30630222222222L); Timestamp ts2Fetched = NanoTimeUtils.getTimestamp(nt2, true); Assert.assertEquals(ts2Fetched.toString(), ts2.toString()); } diff --git ql/src/test/queries/clientpositive/parquet_historical_timestamp.q ql/src/test/queries/clientpositive/parquet_historical_timestamp.q new file mode 100644 index 0000000000..3d2b382af7 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_historical_timestamp.q @@ -0,0 +1,16 @@ +--These files were created by inserting timestamp '2019-01-01 00:30:30.111111111' where writer time zone is Europe/Rome. + +--older writer: time zone dependent behavior. convert to reader time zone +create table legacy_table (t timestamp) stored as parquet; + +load data local inpath '../../data/files/parquet_historical_timestamp_legacy.parq' into table legacy_table; + +select * from legacy_table; + + +--newer writer: time zone agnostic behavior. convert to writer time zone +create table new_table (t timestamp) stored as parquet; + +load data local inpath '../../data/files/parquet_historical_timestamp_new.parq' into table new_table; + +select * from new_table; \ No newline at end of file diff --git ql/src/test/results/clientpositive/parquet_historical_timestamp.q.out ql/src/test/results/clientpositive/parquet_historical_timestamp.q.out new file mode 100644 index 0000000000..9d50b22677 --- /dev/null +++ ql/src/test/results/clientpositive/parquet_historical_timestamp.q.out @@ -0,0 +1,50 @@ +PREHOOK: query: create table legacy_table (t timestamp) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@legacy_table +POSTHOOK: query: create table legacy_table (t timestamp) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@legacy_table +PREHOOK: query: load data local inpath '../../data/files/parquet_historical_timestamp_legacy.parq' into table legacy_table +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@legacy_table +POSTHOOK: query: load data local inpath '../../data/files/parquet_historical_timestamp_legacy.parq' into table legacy_table +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@legacy_table +PREHOOK: query: select * from legacy_table +PREHOOK: type: QUERY +PREHOOK: Input: default@legacy_table +#### A masked pattern was here #### +POSTHOOK: query: select * from legacy_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@legacy_table +#### A masked pattern was here #### +2018-12-31 16:30:30.111111111 +PREHOOK: query: create table new_table (t timestamp) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@new_table +POSTHOOK: query: create table new_table (t timestamp) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@new_table +PREHOOK: query: load data local inpath '../../data/files/parquet_historical_timestamp_new.parq' into table new_table +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@new_table +POSTHOOK: query: load data local inpath '../../data/files/parquet_historical_timestamp_new.parq' into table new_table +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@new_table +PREHOOK: query: select * from new_table +PREHOOK: type: QUERY +PREHOOK: Input: default@new_table +#### A masked pattern was here #### +POSTHOOK: query: select * from new_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@new_table +#### A masked pattern was here #### +2019-01-01 00:30:30.111111111