diff --git common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java index a8b7b6d186..97a3ce1e6d 100644 --- common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java +++ common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java @@ -19,6 +19,7 @@ import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -135,6 +136,13 @@ public long toEpochMilli() { return localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); } + /** + * @return milliseconds since epoch at time zone of zoneId + */ + public long toEpochMilli(ZoneId zoneId) { + return localDateTime.toInstant(zoneId.getRules().getOffset(localDateTime)).toEpochMilli(); + } + public void setTimeInMillis(long epochMilli) { localDateTime = LocalDateTime.ofInstant( Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC); @@ -181,8 +189,11 @@ public static Timestamp ofEpochMilli(long epochMilli) { } public static Timestamp ofEpochMilli(long epochMilli, int nanos) { - return new Timestamp(LocalDateTime - .ofInstant(Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC) + return ofEpochMilli(epochMilli, nanos, ZoneOffset.UTC); + } + + public static Timestamp ofEpochMilli(long epochMilli, int nanos, ZoneId zoneId) { + return new Timestamp(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMilli), zoneId) .withNano(nanos)); } diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4919a4e462..da03457717 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -5466,6 +5466,14 @@ public ZoneId getLocalTimeZone() { return TimestampTZUtil.parseTimeZone(timeZoneStr); } + /** + * Obtains the local time zone ID. + */ + public static ZoneId getLocalTimeZone(Configuration configuration) { + String timeZoneStr = getVar(configuration, ConfVars.HIVE_LOCAL_TIME_ZONE); + return TimestampTZUtil.parseTimeZone(timeZoneStr); + } + /** * @param paramList list of parameter strings * @return list of parameter strings with "." replaced by "\." diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 033e26a238..28a6916ea7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -49,6 +50,7 @@ protected Path file; protected ProjectionPusher projectionPusher; protected boolean skipTimestampConversion = false; + protected ZoneId timeZoneId; protected SerDeStats serDeStats; protected JobConf jobConf; @@ -125,6 +127,7 @@ protected ParquetInputSplit getSplit( if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); } + timeZoneId = HiveConf.getLocalTimeZone(conf); split = new ParquetInputSplit(finalPath, splitStart, splitLength, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index 9010ac36cd..957ed09943 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -14,11 +14,13 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import java.math.BigDecimal; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Map; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -603,7 +605,8 @@ protected TimestampWritableV2 convert(Binary binary) { //If this file written by current Hive implementation itself, we need to do the reverse conversion, else skip the conversion. boolean skipConversion = Boolean.parseBoolean( metadata.get(HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname)); - Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion); + ZoneId timeZoneId = ZoneId.of(metadata.get(DataWritableReadSupport.LOCAL_TIME)); + Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion, timeZoneId); return new TimestampWritableV2(ts); } }; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 7f2a684d28..81bf113f43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -61,6 +61,7 @@ public static final String HIVE_TABLE_AS_PARQUET_SCHEMA = "HIVE_TABLE_SCHEMA"; public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; + public static final String LOCAL_TIME = "LOCAL_TIME"; private TypeInfo hiveTypeInfo; /** * From a string which columns names (including hive column), return a list @@ -348,6 +349,8 @@ private static GroupType buildProjectedGroupType( Map contextMetadata = new HashMap(); boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); + contextMetadata.put(LOCAL_TIME, String.valueOf(HiveConf.getLocalTimeZone(configuration))); + if (columnNames != null) { List columnNamesList = getColumnNames(columnNames); String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java index bf78d8cc5b..7ae575e84d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.ql.io.parquet.timestamp; +import java.time.ZoneId; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.TimeZone; @@ -33,33 +34,30 @@ static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1); private static final ThreadLocal parquetGMTCalendar = new ThreadLocal(); - private static final ThreadLocal parquetLocalCalendar = new ThreadLocal(); + + public static final String UTC = "UTC"; private static Calendar getGMTCalendar() { //Calendar.getInstance calculates the current-time needlessly, so cache an instance. if (parquetGMTCalendar.get() == null) { parquetGMTCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))); } + parquetGMTCalendar.get().clear(); return parquetGMTCalendar.get(); } - private static Calendar getLocalCalendar() { - if (parquetLocalCalendar.get() == null) { - parquetLocalCalendar.set(Calendar.getInstance()); - } - return parquetLocalCalendar.get(); - } - public static Calendar getCalendar(boolean skipConversion) { - Calendar calendar = skipConversion ? getLocalCalendar() : getGMTCalendar(); - calendar.clear(); // Reset all fields before reusing this instance - return calendar; - } + public static NanoTime getNanoTime(Timestamp ts) { + return getNanoTime(ts, true, null); + } - public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { - Calendar calendar = getCalendar(skipConversion); - calendar.setTimeInMillis(ts.toEpochMilli()); + public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId timeZoneId) { + if (skipConversion || timeZoneId == null) { + timeZoneId = ZoneId.of(UTC); + } + Calendar calendar = getGMTCalendar(); + calendar.setTimeInMillis(ts.toEpochMilli(timeZoneId)); int year = calendar.get(Calendar.YEAR); if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) { year = 1 - year; @@ -79,7 +77,16 @@ public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { return new NanoTime(days, nanosOfDay); } - public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { + + public static Timestamp getTimestamp(NanoTime nt) { + return getTimestamp(nt, true, null); + } + + + public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId timeZoneId) { + if (skipConversion || timeZoneId == null) { + timeZoneId = ZoneId.of(UTC); + } int julianDay = nt.getJulianDay(); long nanosOfDay = nt.getTimeOfDayNanos(); @@ -92,7 +99,7 @@ public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { } JDateTime jDateTime = new JDateTime((double) julianDay); - Calendar calendar = getCalendar(skipConversion); + Calendar calendar = getGMTCalendar(); calendar.set(Calendar.YEAR, jDateTime.getYear()); calendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); //java calendar index starting at 1. calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay()); @@ -107,7 +114,7 @@ public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { calendar.set(Calendar.HOUR_OF_DAY, hour); calendar.set(Calendar.MINUTE, minutes); calendar.set(Calendar.SECOND, seconds); - Timestamp ts = Timestamp.ofEpochMilli(calendar.getTimeInMillis(), (int) nanos); + Timestamp ts = Timestamp.ofEpochMilli(calendar.getTimeInMillis(), (int) nanos, timeZoneId); return ts; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java index 9ce1ba4591..a14fe3338b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java @@ -39,6 +39,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.time.ZoneId; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; @@ -53,6 +54,7 @@ private static final Logger LOG = LoggerFactory.getLogger(BaseVectorizedColumnReader.class); protected boolean skipTimestampConversion = false; + protected ZoneId timeZoneId = null; /** * Total number of values read. @@ -116,12 +118,14 @@ public BaseVectorizedColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, + ZoneId timeZoneId, Type parquetType, TypeInfo hiveType) throws IOException { this.descriptor = descriptor; this.type = parquetType; this.pageReader = pageReader; this.maxDefLevel = descriptor.getMaxDefinitionLevel(); this.skipTimestampConversion = skipTimestampConversion; + this.timeZoneId = timeZoneId; this.hiveType = hiveType; DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); @@ -130,7 +134,7 @@ public BaseVectorizedColumnReader( this.dictionary = ParquetDataColumnReaderFactory .getDataColumnReaderByTypeOnDictionary(parquetType.asPrimitiveType(), hiveType, dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage), - skipTimestampConversion); + skipTimestampConversion, timeZoneId); this.isCurrentPageDictionaryEncoded = true; } catch (IOException e) { throw new IOException("could not decode the dictionary for " + descriptor, e); @@ -182,11 +186,11 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int } dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType, dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary - .getDictionary()), skipTimestampConversion); + .getDictionary()), skipTimestampConversion, timeZoneId); this.isCurrentPageDictionaryEncoded = true; } else { dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType, - dataEncoding.getValuesReader(descriptor, VALUES), skipTimestampConversion); + dataEncoding.getValuesReader(descriptor, VALUES), skipTimestampConversion, timeZoneId); this.isCurrentPageDictionaryEncoded = false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java index c1d71337d8..0894115586 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java @@ -44,6 +44,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.time.ZoneId; import java.util.Arrays; /** @@ -1176,17 +1177,21 @@ private static String convertToString(boolean value) { * The reader who reads from the underlying Timestamp value value. */ public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader { + private final ZoneId timeZoneId; private boolean skipTimestampConversion = false; public TypesFromInt96PageReader(ValuesReader realReader, int length, - boolean skipTimestampConversion) { + boolean skipTimestampConversion, ZoneId timeZoneId) { super(realReader, length); this.skipTimestampConversion = skipTimestampConversion; + this.timeZoneId = timeZoneId; } - public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion) { + public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion, + ZoneId timeZoneId) { super(dict, length); this.skipTimestampConversion = skipTimestampConversion; + this.timeZoneId = timeZoneId; } private Timestamp convert(Binary binary) { @@ -1195,7 +1200,7 @@ private Timestamp convert(Binary binary) { long timeOfDayNanos = buf.getLong(); int julianDay = buf.getInt(); NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); - return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion, timeZoneId); } @Override @@ -1482,7 +1487,8 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i Dictionary dictionary, ValuesReader valuesReader, boolean - skipTimestampConversion) + skipTimestampConversion, + ZoneId timeZoneId) throws IOException { // max length for varchar and char cases int length = getVarcharLength(hiveType); @@ -1528,8 +1534,8 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i hiveScale) : new TypesFromFloatPageReader(valuesReader, length, hivePrecision, hiveScale); case INT96: return isDictionary ? new TypesFromInt96PageReader(dictionary, length, - skipTimestampConversion) : new - TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion); + skipTimestampConversion, timeZoneId) : new + TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion, timeZoneId); case BOOLEAN: return isDictionary ? new TypesFromBooleanPageReader(dictionary, length) : new TypesFromBooleanPageReader(valuesReader, length); @@ -1589,19 +1595,22 @@ private static ParquetDataColumnReader getConvertorFromBinary(boolean isDict, public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary( PrimitiveType parquetType, TypeInfo hiveType, - Dictionary realReader, boolean skipTimestampConversion) + Dictionary realReader, + boolean skipTimestampConversion, + ZoneId timeZoneId) throws IOException { return getDataColumnReaderByTypeHelper(true, parquetType, hiveType, realReader, null, - skipTimestampConversion); + skipTimestampConversion, timeZoneId); } public static ParquetDataColumnReader getDataColumnReaderByType(PrimitiveType parquetType, TypeInfo hiveType, ValuesReader realReader, - boolean skipTimestampConversion) + boolean skipTimestampConversion, + ZoneId timeZoneId) throws IOException { return getDataColumnReaderByTypeHelper(false, parquetType, hiveType, null, realReader, - skipTimestampConversion); + skipTimestampConversion, timeZoneId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java index 7e52b076b7..1ecdda57ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java @@ -28,6 +28,7 @@ import org.apache.parquet.column.page.PageReader; import org.apache.parquet.schema.Type; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -46,10 +47,10 @@ // flag to indicate if it's the first time to read parquet data page with this instance boolean isFirstRow = true; - public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader pageReader, - boolean skipTimestampConversion, Type type, TypeInfo hiveType) + public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, + ZoneId timeZoneId, Type type, TypeInfo hiveType) throws IOException { - super(descriptor, pageReader, skipTimestampConversion, type, hiveType); + super(descriptor, pageReader, skipTimestampConversion, timeZoneId, type, hiveType); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index fd776cf978..c112dcf54e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -73,6 +73,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -440,13 +441,13 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < types.size(); ++i) { columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(colsToInclude.get(i)), types.get(i), - pages, requestedSchema.getColumns(), skipTimestampConversion, 0); + pages, requestedSchema.getColumns(), skipTimestampConversion, timeZoneId, 0); } } } else { for (int i = 0; i < types.size(); ++i) { columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages, - requestedSchema.getColumns(), skipTimestampConversion, 0); + requestedSchema.getColumns(), skipTimestampConversion, timeZoneId, 0); } } @@ -489,6 +490,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( PageReadStore pages, List columnDescriptors, boolean skipTimestampConversion, + ZoneId timeZoneId, int depth) throws IOException { List descriptors = getAllColumnDescriptorByType(depth, type, columnDescriptors); @@ -500,7 +502,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( } if (fileSchema.getColumns().contains(descriptors.get(0))) { return new VectorizedPrimitiveColumnReader(descriptors.get(0), - pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type, typeInfo); + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, timeZoneId, type, typeInfo); } else { // Support for schema evolution return new VectorizedDummyColumnReader(); @@ -513,7 +515,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( for (int i = 0; i < fieldTypes.size(); i++) { VectorizedColumnReader r = buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors, - skipTimestampConversion, depth + 1); + skipTimestampConversion, timeZoneId, depth + 1); if (r != null) { fieldReaders.add(r); } else { @@ -531,7 +533,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( } return new VectorizedListColumnReader(descriptors.get(0), - pages.getPageReader(descriptors.get(0)), skipTimestampConversion, getElementType(type), + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, timeZoneId, getElementType(type), typeInfo); case MAP: if (columnDescriptors == null || columnDescriptors.isEmpty()) { @@ -564,10 +566,10 @@ private VectorizedColumnReader buildVectorizedParquetReader( List kvTypes = groupType.getFields(); VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader( descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, - kvTypes.get(0), typeInfo); + timeZoneId, kvTypes.get(0), typeInfo); VectorizedListColumnReader valueListColumnReader = new VectorizedListColumnReader( descriptors.get(1), pages.getPageReader(descriptors.get(1)), skipTimestampConversion, - kvTypes.get(1), typeInfo); + timeZoneId, kvTypes.get(1), typeInfo); return new VectorizedMapColumnReader(keyListColumnReader, valueListColumnReader); case UNION: default: diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 003fbd9fae..e94bcd474b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -31,6 +31,7 @@ import org.apache.parquet.schema.Type; import java.io.IOException; +import java.time.ZoneId; /** * It's column level Parquet reader which is used to read a batch of records for a column, @@ -47,8 +48,9 @@ public VectorizedPrimitiveColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, + ZoneId timeZoneId, Type type, TypeInfo hiveType) throws IOException { - super(descriptor, pageReader, skipTimestampConversion, type, hiveType); + super(descriptor, pageReader, skipTimestampConversion, timeZoneId, type, hiveType); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java index 896094473b..a51042bcdc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java @@ -13,9 +13,11 @@ */ package org.apache.hadoop.hive.ql.io.parquet.write; +import java.time.ZoneId; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hive.common.util.HiveVersionInfo; @@ -35,6 +37,7 @@ private DataWritableWriter writer; private MessageType schema; + private ZoneId zoneId; public static void setSchema(final MessageType schema, final Configuration configuration) { configuration.set(PARQUET_HIVE_SCHEMA, schema.toString()); @@ -46,13 +49,14 @@ public static MessageType getSchema(final Configuration configuration) { @Override public WriteContext init(final Configuration configuration) { + zoneId = HiveConf.getLocalTimeZone(configuration); schema = getSchema(configuration); return new WriteContext(schema, new HashMap()); } @Override public void prepareForWrite(final RecordConsumer recordConsumer) { - writer = new DataWritableWriter(recordConsumer, schema); + writer = new DataWritableWriter(recordConsumer, schema, zoneId); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java index 3d61c33afd..8a3fbe9d70 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java @@ -49,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.ZoneId; import java.util.List; import java.util.Map; @@ -62,14 +63,16 @@ private static final Logger LOG = LoggerFactory.getLogger(DataWritableWriter.class); protected final RecordConsumer recordConsumer; private final GroupType schema; + private final ZoneId zoneId; /* This writer will be created when writing the first row in order to get information about how to inspect the record data. */ private DataWriter messageWriter; - public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema) { + public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema, final ZoneId zoneId) { this.recordConsumer = recordConsumer; this.schema = schema; + this.zoneId = zoneId; } /** @@ -496,7 +499,7 @@ public TimestampDataWriter(TimestampObjectInspector inspector) { @Override public void write(Object value) { Timestamp ts = inspector.getPrimitiveJavaObject(value); - recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary()); + recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false, zoneId).toBinary()); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java index 94a38e0173..9bb2107d98 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.ql.io.parquet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter; @@ -45,6 +46,7 @@ import org.apache.parquet.schema.MessageTypeParser; import java.io.UnsupportedEncodingException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -192,7 +194,7 @@ private ParquetHiveRecord getParquetWritable(String columnNames, String columnTy private void writeParquetRecord(String schema, ParquetHiveRecord record) throws SerDeException { MessageType fileSchema = MessageTypeParser.parseMessageType(schema); - DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, fileSchema); + DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, fileSchema, null); hiveParquetWriter.write(record); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index 1d32afe00c..37bf84b025 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -211,7 +211,7 @@ protected static boolean getBooleanValue( protected static NanoTime getNanoTime(int index) { Timestamp ts = new Timestamp(); ts.setTimeInMillis(index); - return NanoTimeUtils.getNanoTime(ts, false); + return NanoTimeUtils.getNanoTime(ts, false, HiveConf.getLocalTimeZone(conf)); } protected static HiveDecimal getDecimal( diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java index 477825e3f4..383202a489 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.ql.io.parquet.serde; +import java.time.ZoneId; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.TimeZone; @@ -32,6 +33,12 @@ */ public class TestParquetTimestampUtils extends TestCase { + private static final String US_PACIFIC = "US/Pacific"; + private static final String EUROPE_ROME = "Europe/Rome"; + public static final String UTC = "UTC"; + public static final String GMT = "GMT"; + + public void testJulianDay() { //check if May 23, 1968 is Julian Day 2440000 Calendar cal = Calendar.getInstance(); @@ -39,13 +46,13 @@ public void testJulianDay() { cal.set(Calendar.MONTH, Calendar.MAY); cal.set(Calendar.DAY_OF_MONTH, 23); cal.set(Calendar.HOUR_OF_DAY, 0); - cal.setTimeZone(TimeZone.getTimeZone("GMT")); + cal.setTimeZone(TimeZone.getTimeZone(GMT)); Timestamp ts = Timestamp.ofEpochMilli(cal.getTimeInMillis()); - NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); + NanoTime nt = NanoTimeUtils.getNanoTime(ts, false, ZoneId.of(cal.getTimeZone().getID())); Assert.assertEquals(nt.getJulianDay(), 2440000); - Timestamp tsFetched = NanoTimeUtils.getTimestamp(nt, false); + Timestamp tsFetched = NanoTimeUtils.getTimestamp(nt, false, ZoneId.of(cal.getTimeZone().getID())); Assert.assertEquals(tsFetched, ts); //check if 30 Julian Days between Jan 1, 2005 and Jan 31, 2005. @@ -54,12 +61,12 @@ public void testJulianDay() { cal1.set(Calendar.MONTH, Calendar.JANUARY); cal1.set(Calendar.DAY_OF_MONTH, 1); cal1.set(Calendar.HOUR_OF_DAY, 0); - cal1.setTimeZone(TimeZone.getTimeZone("GMT")); + cal1.setTimeZone(TimeZone.getTimeZone(GMT)); Timestamp ts1 = Timestamp.ofEpochMilli(cal1.getTimeInMillis()); - NanoTime nt1 = NanoTimeUtils.getNanoTime(ts1, false); + NanoTime nt1 = NanoTimeUtils.getNanoTime(ts1, false, ZoneId.of(cal1.getTimeZone().getID())); - Timestamp ts1Fetched = NanoTimeUtils.getTimestamp(nt1, false); + Timestamp ts1Fetched = NanoTimeUtils.getTimestamp(nt1, false, ZoneId.of(cal1.getTimeZone().getID())); Assert.assertEquals(ts1Fetched, ts1); Calendar cal2 = Calendar.getInstance(); @@ -67,12 +74,12 @@ public void testJulianDay() { cal2.set(Calendar.MONTH, Calendar.JANUARY); cal2.set(Calendar.DAY_OF_MONTH, 31); cal2.set(Calendar.HOUR_OF_DAY, 0); - cal2.setTimeZone(TimeZone.getTimeZone("UTC")); + cal2.setTimeZone(TimeZone.getTimeZone(UTC)); Timestamp ts2 = Timestamp.ofEpochMilli(cal2.getTimeInMillis()); - NanoTime nt2 = NanoTimeUtils.getNanoTime(ts2, false); + NanoTime nt2 = NanoTimeUtils.getNanoTime(ts2, false, ZoneId.of(cal2.getTimeZone().getID())); - Timestamp ts2Fetched = NanoTimeUtils.getTimestamp(nt2, false); + Timestamp ts2Fetched = NanoTimeUtils.getTimestamp(nt2, false, ZoneId.of(cal2.getTimeZone().getID())); Assert.assertEquals(ts2Fetched, ts2); Assert.assertEquals(nt2.getJulianDay() - nt1.getJulianDay(), 30); @@ -83,12 +90,12 @@ public void testJulianDay() { cal1.set(Calendar.MONTH, Calendar.JANUARY); cal1.set(Calendar.DAY_OF_MONTH, 1); cal1.set(Calendar.HOUR_OF_DAY, 0); - cal1.setTimeZone(TimeZone.getTimeZone("GMT")); + cal1.setTimeZone(TimeZone.getTimeZone(GMT)); ts1 = Timestamp.ofEpochMilli(cal1.getTimeInMillis()); - nt1 = NanoTimeUtils.getNanoTime(ts1, false); + nt1 = NanoTimeUtils.getNanoTime(ts1, false, ZoneId.of(cal1.getTimeZone().getID())); - ts1Fetched = NanoTimeUtils.getTimestamp(nt1, false); + ts1Fetched = NanoTimeUtils.getTimestamp(nt1, false, ZoneId.of(cal1.getTimeZone().getID())); Assert.assertEquals(ts1Fetched, ts1); cal2 = Calendar.getInstance(); @@ -96,12 +103,12 @@ public void testJulianDay() { cal2.set(Calendar.MONTH, Calendar.JANUARY); cal2.set(Calendar.DAY_OF_MONTH, 31); cal2.set(Calendar.HOUR_OF_DAY, 0); - cal2.setTimeZone(TimeZone.getTimeZone("UTC")); + cal2.setTimeZone(TimeZone.getTimeZone(UTC)); ts2 = Timestamp.ofEpochMilli(cal2.getTimeInMillis()); - nt2 = NanoTimeUtils.getNanoTime(ts2, false); + nt2 = NanoTimeUtils.getNanoTime(ts2, false, ZoneId.of(cal2.getTimeZone().getID())); - ts2Fetched = NanoTimeUtils.getTimestamp(nt2, false); + ts2Fetched = NanoTimeUtils.getTimestamp(nt2, false, ZoneId.of(cal2.getTimeZone().getID())); Assert.assertEquals(ts2Fetched, ts2); Assert.assertEquals(nt2.getJulianDay() - nt1.getJulianDay(), 1464305); } @@ -115,11 +122,11 @@ public void testNanos() { cal.set(Calendar.HOUR_OF_DAY, 1); cal.set(Calendar.MINUTE, 1); cal.set(Calendar.SECOND, 1); - cal.setTimeZone(TimeZone.getTimeZone("GMT")); + cal.setTimeZone(TimeZone.getTimeZone(GMT)); Timestamp ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 1); //(1*60*60 + 1*60 + 1) * 10e9 + 1 - NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); + NanoTime nt = NanoTimeUtils.getNanoTime(ts, false, ZoneId.of(cal.getTimeZone().getID())); Assert.assertEquals(nt.getTimeOfDayNanos(), 3661000000001L); //case 2: 23:59:59.999999999 @@ -130,11 +137,11 @@ public void testNanos() { cal.set(Calendar.HOUR_OF_DAY, 23); cal.set(Calendar.MINUTE, 59); cal.set(Calendar.SECOND, 59); - cal.setTimeZone(TimeZone.getTimeZone("GMT")); + cal.setTimeZone(TimeZone.getTimeZone(GMT)); ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 999999999); //(23*60*60 + 59*60 + 59)*10e9 + 999999999 - nt = NanoTimeUtils.getNanoTime(ts, false); + nt = NanoTimeUtils.getNanoTime(ts, false, ZoneId.of(cal.getTimeZone().getID())); Assert.assertEquals(nt.getTimeOfDayNanos(), 86399999999999L); //case 3: verify the difference. @@ -145,7 +152,7 @@ public void testNanos() { cal2.set(Calendar.HOUR_OF_DAY, 0); cal2.set(Calendar.MINUTE, 10); cal2.set(Calendar.SECOND, 0); - cal2.setTimeZone(TimeZone.getTimeZone("GMT")); + cal2.setTimeZone(TimeZone.getTimeZone(GMT)); Timestamp ts2 = Timestamp.ofEpochMilli(cal2.getTimeInMillis(), 10); Calendar cal1 = Calendar.getInstance(); @@ -155,18 +162,18 @@ public void testNanos() { cal1.set(Calendar.HOUR_OF_DAY, 0); cal1.set(Calendar.MINUTE, 0); cal1.set(Calendar.SECOND, 0); - cal1.setTimeZone(TimeZone.getTimeZone("GMT")); + cal1.setTimeZone(TimeZone.getTimeZone(GMT)); Timestamp ts1 = Timestamp.ofEpochMilli(cal1.getTimeInMillis(), 1); - NanoTime n2 = NanoTimeUtils.getNanoTime(ts2, false); - NanoTime n1 = NanoTimeUtils.getNanoTime(ts1, false); + NanoTime n2 = NanoTimeUtils.getNanoTime(ts2, false, ZoneId.of(cal1.getTimeZone().getID())); + NanoTime n1 = NanoTimeUtils.getNanoTime(ts1, false, ZoneId.of(cal1.getTimeZone().getID())); Assert.assertEquals(n2.getTimeOfDayNanos() - n1.getTimeOfDayNanos(), 600000000009L); NanoTime n3 = new NanoTime(n1.getJulianDay() - 1, n1.getTimeOfDayNanos() + TimeUnit.DAYS.toNanos(1)); - Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); + Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false, ZoneId.of(GMT))); n3 = new NanoTime(n1.getJulianDay() + 3, n1.getTimeOfDayNanos() - TimeUnit.DAYS.toNanos(3)); - Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); + Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false, ZoneId.of(GMT))); } public void testTimezone() { @@ -177,8 +184,8 @@ public void testTimezone() { cal.set(Calendar.HOUR_OF_DAY, 17); cal.set(Calendar.MINUTE, 1); cal.set(Calendar.SECOND, 1); - cal.setTimeZone(TimeZone.getTimeZone("US/Pacific")); - Timestamp ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 1); + cal.setTimeZone(TimeZone.getTimeZone(US_PACIFIC)); + Timestamp ts = Timestamp.ofEpochMilli(cal.getTimeInMillis(), 1, ZoneId.of(US_PACIFIC)); /** * 17:00 PDT = 00:00 GMT (daylight-savings) @@ -187,7 +194,7 @@ public void testTimezone() { * 17:00 PST = 01:00 GMT (if not daylight savings) * (1*60*60 + 1*60 + 1)*10e9 + 1 = 3661000000001 */ - NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); + NanoTime nt = NanoTimeUtils.getNanoTime(ts, false, ZoneId.of(US_PACIFIC)); long timeOfDayNanos = nt.getTimeOfDayNanos(); Assert.assertTrue(timeOfDayNanos == 61000000001L || timeOfDayNanos == 3661000000001L); @@ -205,17 +212,17 @@ public void testTimezonelessValues() { public void testTimezoneless() { Timestamp ts1 = Timestamp.valueOf("2011-01-01 00:30:30.111111111"); - NanoTime nt1 = NanoTimeUtils.getNanoTime(ts1, true); - Assert.assertEquals(nt1.getJulianDay(), 2455562); - Assert.assertEquals(nt1.getTimeOfDayNanos(), 59430111111111L); - Timestamp ts1Fetched = NanoTimeUtils.getTimestamp(nt1, true); + NanoTime nt1 = NanoTimeUtils.getNanoTime(ts1); + Assert.assertEquals(nt1.getJulianDay(), 2455563); + Assert.assertEquals(nt1.getTimeOfDayNanos(), 1830111111111L); + Timestamp ts1Fetched = NanoTimeUtils.getTimestamp(nt1); Assert.assertEquals(ts1Fetched.toString(), ts1.toString()); Timestamp ts2 = Timestamp.valueOf("2011-02-02 08:30:30.222222222"); - NanoTime nt2 = NanoTimeUtils.getNanoTime(ts2, true); + NanoTime nt2 = NanoTimeUtils.getNanoTime(ts2); Assert.assertEquals(nt2.getJulianDay(), 2455595); - Assert.assertEquals(nt2.getTimeOfDayNanos(), 1830222222222L); - Timestamp ts2Fetched = NanoTimeUtils.getTimestamp(nt2, true); + Assert.assertEquals(nt2.getTimeOfDayNanos(), 30630222222222L); + Timestamp ts2Fetched = NanoTimeUtils.getTimestamp(nt2); Assert.assertEquals(ts2Fetched.toString(), ts2.toString()); } @@ -250,8 +257,8 @@ private void valueTest(boolean local) { private void verifyTsString(String tsString, boolean local) { Timestamp ts = Timestamp.valueOf(tsString); - NanoTime nt = NanoTimeUtils.getNanoTime(ts, local); - Timestamp tsFetched = NanoTimeUtils.getTimestamp(nt, local); + NanoTime nt = NanoTimeUtils.getNanoTime(ts, local, ZoneId.of(EUROPE_ROME)); + Timestamp tsFetched = NanoTimeUtils.getTimestamp(nt, local, ZoneId.of(EUROPE_ROME)); Assert.assertEquals(tsString, tsFetched.toString()); } }