diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b27b663..6c13aa5 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1318,9 +1318,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Maximum fraction of heap that can be used by Parquet file writers in one task.\n" + "It is for avoiding OutOfMemory error in tasks. Work with Parquet 1.6.0 and above.\n" + "This config parameter is defined in Parquet, so that it does not start with 'hive.'."), + @Deprecated HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION("hive.parquet.timestamp.skip.conversion", true, - "Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversion" + - "on reading parquet files from other tools"), + "Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversion" + + "on reading parquet files from other tools"), + PARQUET_INT96_DEFAULT_UTC_WRITE_ZONE("parquet.mr.int96.enable.utc.write.zone", false, + "Enable this variable to use UTC as the default timezone for new Parquet tables."), HIVE_INT_TIMESTAMP_CONVERSION_IN_SECONDS("hive.int.timestamp.conversion.in.seconds", false, "Boolean/tinyint/smallint/int/bigint value is interpreted as milliseconds during the timestamp conversion.\n" + "Set this flag to true to interpret the value as seconds to be consistent with float/double." ), diff --git data/files/impala_int96_timestamp.parq data/files/impala_int96_timestamp.parq new file mode 100644 index 0000000..d67dd14 --- /dev/null +++ data/files/impala_int96_timestamp.parq @@ -0,0 +1 @@ +PAR1L ,šdT-%, &v5tsfn&>&,Hschema%ts&v5tsfn&>&n(Nimpala version 2.3.0-cdh5.5.0 (build 0c891d79aa38f297d244855a32f1e17280e2129b)‘PAR1 \ No newline at end of file diff --git itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java index a14b790..781c4b9 100644 --- itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java +++ itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java @@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.openjdk.jmh.annotations.Param; @@ -338,7 +339,7 @@ public RecordReader getVectorizedRecordReader(Path inputPath) throws Exception { Job vectorJob = new Job(conf, "read vector"); ParquetInputFormat.setInputPaths(vectorJob, inputPath); ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); - InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + ParquetInputSplit split = (ParquetInputSplit) parquetInputFormat.getSplits(vectorJob).get(0); initialVectorizedRowBatchCtx(conf); return new VectorizedParquetRecordReader(split, new JobConf(conf)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index adabe70..e674ca4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; @@ -4223,6 +4224,18 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { } } + // If PARQUET_INT96_DEFAULT_UTC_WRITE_ZONE is set to True, then set new Parquet tables timezone + // to UTC by default (only if the table property is not set) + if (tbl.getSerializationLib().equals(ParquetHiveSerDe.class.getName())) { + SessionState ss = SessionState.get(); + if (ss.getConf().getBoolVar(ConfVars.PARQUET_INT96_DEFAULT_UTC_WRITE_ZONE)) { + String parquetTimezone = tbl.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY); + if (parquetTimezone == null || parquetTimezone.isEmpty()) { + tbl.setProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, ParquetTableUtils.PARQUET_INT96_DEFAULT_WRITE_ZONE); + } + } + } + // create the table if (crtTbl.getReplaceMode()){ // replace-mode creates are really alters using CreateTableDesc. @@ -4344,6 +4357,12 @@ private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws Exceptio if (paramsStr != null) { retainer.addAll(Arrays.asList(paramsStr.split(","))); } + + // Retain Parquet INT96 write zone property to keep Parquet timezone bugfixes. + if (params.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY) != null) { + retainer.add(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY); + } + if (!retainer.isEmpty()) { params.keySet().retainAll(retainer); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 379a913..bf3eacd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -18,7 +18,10 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.TimeZone; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; +import org.apache.parquet.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -113,6 +116,7 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws } DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); + DataWritableWriteSupport.setTimeZone(getParquetWriterTimeZone(tableProperties), jobConf); return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress,tableProperties); @@ -128,4 +132,21 @@ protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress,tableProperties); } + + private TimeZone getParquetWriterTimeZone(Properties tableProperties) { + // PARQUET_INT96_WRITE_ZONE_PROPERTY is a table property used to detect what timezone + // conversion to use when writing Parquet timestamps. + String timeZoneID = + tableProperties.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY); + if (!Strings.isNullOrEmpty(timeZoneID)) { + if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) { + throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID); + } + return TimeZone.getTimeZone(timeZoneID); + } + + // If no timezone is defined in table properties, then write timestamps using + // PARQUET_INT96_DEFAULT_WRITE_ZONE timezone + return TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_DEFAULT_WRITE_ZONE); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 167f9b6..bc412a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -14,10 +14,12 @@ package org.apache.hadoop.hive.ql.io.parquet; import com.google.common.base.Strings; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -26,6 +28,7 @@ import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetInputSplit; @@ -41,14 +44,15 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.TimeZone; public class ParquetRecordReaderBase { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderBase.class); protected Path file; protected ProjectionPusher projectionPusher; - protected boolean skipTimestampConversion = false; protected SerDeStats serDeStats; protected JobConf jobConf; @@ -70,6 +74,11 @@ protected ParquetInputSplit getSplit( final JobConf conf ) throws IOException { ParquetInputSplit split; + + if (oldSplit == null) { + return null; + } + if (oldSplit instanceof FileSplit) { final Path finalPath = ((FileSplit) oldSplit).getPath(); jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); @@ -122,9 +131,6 @@ protected ParquetInputSplit getSplit( filtedBlocks = splitGroup; } - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); - } split = new ParquetInputSplit(finalPath, splitStart, splitLength, @@ -140,6 +146,51 @@ protected ParquetInputSplit getSplit( } } + /** + * Sets the TimeZone conversion for Parquet timestamp columns. + * + * @param configuration Configuration object where to get and set the TimeZone conversion + * @param finalPath path to the parquet file + */ + protected void setTimeZoneConversion(Configuration configuration, Path finalPath) { + ParquetMetadata parquetMetadata; + String timeZoneID; + + try { + parquetMetadata = ParquetFileReader.readFooter(configuration, finalPath, + ParquetMetadataConverter.NO_FILTER); + } catch (IOException e) { + // If an error occurred while reading the file, then we just skip the TimeZone setting. + // This error will probably occur on any other part of the code. + LOG.debug("Could not read parquet file footer at " + finalPath + ". Cannot determine " + + "parquet file timezone", e); + return; + } + + boolean skipConversion = HiveConf.getBoolVar(configuration, + HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION); + FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") || + skipConversion) { + // Impala writes timestamp values using GMT only. We should not try to convert Impala + // files to other type of timezones. + timeZoneID = ParquetTableUtils.PARQUET_INT96_DEFAULT_WRITE_ZONE; + } else { + // TABLE_PARQUET_INT96_TIMEZONE is a table property used to detect what timezone conversion + // to use when reading Parquet timestamps. + timeZoneID = configuration.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, + ParquetTableUtils.PARQUET_INT96_DEFAULT_WRITE_ZONE); + + if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) { + throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID); + } + } + + // 'timeZoneID' should be valid, since we did not throw exception above + configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, + TimeZone.getTimeZone(timeZoneID).getID()); + } + public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); if (sarg == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index 76d93b8..f4ad083 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -16,9 +16,11 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Calendar; import java.util.Map; +import java.util.TimeZone; -import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -36,6 +38,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.parquet.Strings; import org.apache.parquet.column.Dictionary; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; @@ -193,16 +196,21 @@ protected HiveDecimalWritable convert(Binary binary) { ETIMESTAMP_CONVERTER(TimestampWritable.class) { @Override PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeInfo hiveTypeInfo) { + Map metadata = parent.getMetadata(); + + // This variable must be initialized only once to keep good read performance while doing conversion of timestamps values. + final Calendar calendar; + if (Strings.isNullOrEmpty(metadata.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY))) { + // Local time should be used if timezone is not available. + calendar = Calendar.getInstance(); + } else { + calendar = Calendar.getInstance(TimeZone.getTimeZone(metadata.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY))); + } + return new BinaryConverter(type, parent, index) { @Override protected TimestampWritable convert(Binary binary) { - NanoTime nt = NanoTime.fromBinary(binary); - Map metadata = parent.getMetadata(); - //Current Hive parquet timestamp implementation stores it in UTC, but other components do not do that. - //If this file written by current Hive implementation itself, we need to do the reverse conversion, else skip the conversion. - boolean skipConversion = Boolean.parseBoolean( - metadata.get(HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname)); - Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion); + Timestamp ts = NanoTimeUtils.getTimestamp(NanoTime.fromBinary(binary), calendar); return new TimestampWritable(ts); } }; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 604cbbc..65178cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -16,7 +16,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -27,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.FieldNode; import org.apache.hadoop.hive.ql.optimizer.NestedColumnFieldPruningUtils; @@ -59,9 +59,9 @@ * */ public class DataWritableReadSupport extends ReadSupport { - public static final String HIVE_TABLE_AS_PARQUET_SCHEMA = "HIVE_TABLE_SCHEMA"; public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; + private TypeInfo hiveTypeInfo; /** * From a string which columns names (including hive column), return a list @@ -349,6 +349,11 @@ private static GroupType buildProjectedGroupType( Map contextMetadata = new HashMap(); boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); + // Adds the PARQUET_INT96_WRITE_ZONE_PROPERTY value to the metadata object so that it passes the timezone + // to the Parquet readers. PARQUET_INT96_WRITE_ZONE_PROPERTY is set on ParquetRecordReaderWrapper. + contextMetadata.put(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, + configuration.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY)); + if (columnNames != null) { List columnNamesList = getColumnNames(columnNames); String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); @@ -402,16 +407,6 @@ private static GroupType buildProjectedGroupType( public RecordMaterializer prepareForRead(final Configuration configuration, final Map keyValueMetaData, final MessageType fileSchema, final org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) { - final Map metadata = readContext.getReadSupportMetadata(); - if (metadata == null) { - throw new IllegalStateException("ReadContext not initialized properly. " + - "Don't know the Hive Schema."); - } - String key = HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname; - if (!metadata.containsKey(key)) { - metadata.put(key, String.valueOf(HiveConf.getBoolVar( - configuration, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION))); - } - return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo); + return new DataWritableRecordConverter(readContext.getRequestedSchema(), readContext.getReadSupportMetadata(), hiveTypeInfo); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index ac430a6..66fca1a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -16,10 +16,13 @@ import java.io.IOException; import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; @@ -27,10 +30,6 @@ import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -80,13 +79,14 @@ public ParquetRecordReaderWrapper( } // create a TaskInputOutputContext - Configuration conf = jobConf; - if (skipTimestampConversion ^ HiveConf.getBoolVar( - conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - conf = new JobConf(oldJobConf); - HiveConf.setBoolVar(conf, - HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, skipTimestampConversion); - } + // TODO: This line is left due to incorrect Predicate push down results (parquet_ppd_char,parquet_ppd_varchar). + // The problem is that Parquet PPD is set on getSplit() function called above, but the old code used this + // line to overwrite such configuration. I'm adding a fix to timestamp issues only, so we should follow up + // this issue in another JIRA. + JobConf conf = new JobConf(oldJobConf); + + // Set the TimeZone conversion in case the file has timestamp columns. + setTimeZoneConversion(conf, ((FileSplit)oldSplit).getPath()); final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); if (split != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java new file mode 100644 index 0000000..fd37891 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java @@ -0,0 +1,22 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.serde; + +public class ParquetTableUtils { + // Parquet table properties + public static final String PARQUET_INT96_WRITE_ZONE_PROPERTY = "parquet.mr.int96.write.zone"; + + // Parquet table default values + public static final String PARQUET_INT96_DEFAULT_WRITE_ZONE = "UTC"; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java index 3fd75d2..049cd33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java @@ -16,98 +16,164 @@ import java.sql.Timestamp; import java.util.Calendar; import java.util.GregorianCalendar; +import java.util.Objects; import java.util.TimeZone; import java.util.concurrent.TimeUnit; import jodd.datetime.JDateTime; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; /** * Utilities for converting from java.sql.Timestamp to parquet timestamp. * This utilizes the Jodd library. */ public class NanoTimeUtils { - static final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1); - static final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1); - static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); - static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1); - - private static final ThreadLocal parquetGMTCalendar = new ThreadLocal(); - private static final ThreadLocal parquetLocalCalendar = new ThreadLocal(); - - private static Calendar getGMTCalendar() { - //Calendar.getInstance calculates the current-time needlessly, so cache an instance. - if (parquetGMTCalendar.get() == null) { - parquetGMTCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))); - } - return parquetGMTCalendar.get(); - } - - private static Calendar getLocalCalendar() { - if (parquetLocalCalendar.get() == null) { - parquetLocalCalendar.set(Calendar.getInstance()); - } - return parquetLocalCalendar.get(); - } - - public static Calendar getCalendar(boolean skipConversion) { - Calendar calendar = skipConversion ? getLocalCalendar() : getGMTCalendar(); - calendar.clear(); // Reset all fields before reusing this instance - return calendar; - } - - public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { - - Calendar calendar = getCalendar(skipConversion); - calendar.setTime(ts); - int year = calendar.get(Calendar.YEAR); - if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) { - year = 1 - year; - } - JDateTime jDateTime = new JDateTime(year, - calendar.get(Calendar.MONTH) + 1, //java calendar index starting at 1. - calendar.get(Calendar.DAY_OF_MONTH)); - int days = jDateTime.getJulianDayNumber(); - - long hour = calendar.get(Calendar.HOUR_OF_DAY); - long minute = calendar.get(Calendar.MINUTE); - long second = calendar.get(Calendar.SECOND); - long nanos = ts.getNanos(); - long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_MINUTE * minute + - NANOS_PER_HOUR * hour; - - return new NanoTime(days, nanosOfDay); - } - - public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { - int julianDay = nt.getJulianDay(); - long nanosOfDay = nt.getTimeOfDayNanos(); - - long remainder = nanosOfDay; - julianDay += remainder / NANOS_PER_DAY; - remainder %= NANOS_PER_DAY; - if (remainder < 0) { - remainder += NANOS_PER_DAY; - julianDay--; - } - - JDateTime jDateTime = new JDateTime((double) julianDay); - Calendar calendar = getCalendar(skipConversion); - calendar.set(Calendar.YEAR, jDateTime.getYear()); - calendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); //java calendar index starting at 1. - calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay()); - - int hour = (int) (remainder / (NANOS_PER_HOUR)); - remainder = remainder % (NANOS_PER_HOUR); - int minutes = (int) (remainder / (NANOS_PER_MINUTE)); - remainder = remainder % (NANOS_PER_MINUTE); - int seconds = (int) (remainder / (NANOS_PER_SECOND)); - long nanos = remainder % NANOS_PER_SECOND; - - calendar.set(Calendar.HOUR_OF_DAY, hour); - calendar.set(Calendar.MINUTE, minutes); - calendar.set(Calendar.SECOND, seconds); - Timestamp ts = new Timestamp(calendar.getTimeInMillis()); - ts.setNanos((int) nanos); - return ts; - } + private static final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1); + private static final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1); + private static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + private static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1); + + private static final ThreadLocal parquetUTCCalendar = new ThreadLocal(); + private static final ThreadLocal parquetLocalCalendar = new ThreadLocal(); + + private static Calendar getUTCCalendar() { + //Calendar.getInstance calculates the current-time needlessly, so cache an instance. + if (parquetUTCCalendar.get() == null) { + parquetUTCCalendar.set(Calendar.getInstance( + TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_DEFAULT_WRITE_ZONE))); + } + return parquetUTCCalendar.get(); + } + + private static Calendar getLocalCalendar() { + if (parquetLocalCalendar.get() == null) { + parquetLocalCalendar.set(Calendar.getInstance()); + } + return parquetLocalCalendar.get(); + } + + public static Calendar getCalendar(boolean skipConversion) { + Calendar calendar = skipConversion ? Calendar.getInstance( + TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_DEFAULT_WRITE_ZONE)) + : Calendar.getInstance(); + calendar.clear(); // Reset all fields before reusing this instance + return calendar; + } + + @Deprecated + public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { + return getNanoTime(ts, getCalendar(skipConversion)); + } + + /** + * Constructs a julian date from the floating time Timestamp. + * If the timezone of the calendar is different from the current local + * timezone, then the timestamp value will be adjusted. + * Possible adjustments: + * - UTC Ts -> Local Ts copied to TableTZ Calendar -> UTC Ts -> JD + * @param ts floating time timestamp to store + * @param calendar timezone used to store the timestamp in parquet + * @return adjusted julian date + */ + public static NanoTime getNanoTime(Timestamp ts, Calendar calendar) { + + Calendar localCalendar = getLocalCalendar(); + localCalendar.setTimeInMillis(ts.getTime()); + + Calendar adjustedCalendar = copyToCalendaWithTZ(localCalendar, calendar.getTimeZone()); + + Calendar utcCalendar = getUTCCalendar(); + utcCalendar.setTimeInMillis(adjustedCalendar.getTimeInMillis()); + + int year = utcCalendar.get(Calendar.YEAR); + if (utcCalendar.get(Calendar.ERA) == GregorianCalendar.BC) { + year = 1 - year; + } + JDateTime jDateTime = new JDateTime(year, + utcCalendar.get(Calendar.MONTH) + 1, //java calendar index starting at 1. + utcCalendar.get(Calendar.DAY_OF_MONTH)); + int days = jDateTime.getJulianDayNumber(); + + long hour = utcCalendar.get(Calendar.HOUR_OF_DAY); + long minute = utcCalendar.get(Calendar.MINUTE); + long second = utcCalendar.get(Calendar.SECOND); + long nanos = ts.getNanos(); + long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_MINUTE * minute + + NANOS_PER_HOUR * hour; + + return new NanoTime(days, nanosOfDay); + } + + @Deprecated + public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { + return getTimestamp(nt, getCalendar(skipConversion)); + } + + /** + * Constructs a floating time Timestamp from the julian date contained in NanoTime. + * If the timezone of the calendar is different from the current local + * timezone, then the timestamp value will be adjusted. + * Possible adjustments: + * - JD -> UTC Ts -> TableTZ Calendar copied to LocalTZ Calendar -> UTC Ts + * @param nt stored julian date + * @param calendar timezone used when storing the timestamp in parquet + * @return floating time represented as a timestamp. Guaranteed to display + * the same when formatted using the current local timezone as with the local + * timezone at the time it was stored. + */ + public static Timestamp getTimestamp(NanoTime nt, Calendar calendar) { + int julianDay = nt.getJulianDay(); + long nanosOfDay = nt.getTimeOfDayNanos(); + + long remainder = nanosOfDay; + julianDay += remainder / NANOS_PER_DAY; + remainder %= NANOS_PER_DAY; + if (remainder < 0) { + remainder += NANOS_PER_DAY; + julianDay--; + } + + JDateTime jDateTime = new JDateTime((double) julianDay); + + Calendar utcCalendar = getUTCCalendar(); + utcCalendar.clear(); + utcCalendar.set(Calendar.YEAR, jDateTime.getYear()); + utcCalendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); //java calendar index starting at 1. + utcCalendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay()); + + int hour = (int) (remainder / (NANOS_PER_HOUR)); + remainder = remainder % (NANOS_PER_HOUR); + int minutes = (int) (remainder / (NANOS_PER_MINUTE)); + remainder = remainder % (NANOS_PER_MINUTE); + int seconds = (int) (remainder / (NANOS_PER_SECOND)); + long nanos = remainder % NANOS_PER_SECOND; + + utcCalendar.set(Calendar.HOUR_OF_DAY, hour); + utcCalendar.set(Calendar.MINUTE, minutes); + utcCalendar.set(Calendar.SECOND, seconds); + + calendar.setTimeInMillis(utcCalendar.getTimeInMillis()); + + Calendar adjusterCalendar = copyToCalendaWithTZ(calendar, TimeZone.getDefault()); + + Timestamp ts = new Timestamp(adjusterCalendar.getTimeInMillis()); + ts.setNanos((int) nanos); + return ts; + } + + private static Calendar copyToCalendaWithTZ(Calendar from, TimeZone tz) { + if(from.getTimeZone().getID().equals(tz.getID())) { + return from; + } else { + Calendar to = Calendar.getInstance(tz); + to.set(Calendar.ERA, from.get(Calendar.ERA)); + to.set(Calendar.YEAR, from.get(Calendar.YEAR)); + to.set(Calendar.MONTH, from.get(Calendar.MONTH)); + to.set(Calendar.DAY_OF_MONTH, from.get(Calendar.DAY_OF_MONTH)); + to.set(Calendar.HOUR_OF_DAY, from.get(Calendar.HOUR_OF_DAY)); + to.set(Calendar.MINUTE, from.get(Calendar.MINUTE)); + to.set(Calendar.SECOND, from.get(Calendar.SECOND)); + return to; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index b6a1a7a..6ca1963 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -21,14 +21,15 @@ import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; @@ -97,12 +98,15 @@ @VisibleForTesting public VectorizedParquetRecordReader( - InputSplit inputSplit, - JobConf conf) { + ParquetInputSplit inputSplit, + JobConf conf) { try { serDeStats = new SerDeStats(); projectionPusher = new ProjectionPusher(); - initialize(inputSplit, conf); + if (inputSplit != null) { + initialize(inputSplit, conf); + setTimeZoneConversion(jobConf, inputSplit.getPath()); + } colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); rbCtx = Utilities.getVectorizedRowBatchCtx(conf); } catch (Throwable e) { @@ -117,7 +121,10 @@ public VectorizedParquetRecordReader( try { serDeStats = new SerDeStats(); projectionPusher = new ProjectionPusher(); - initialize(getSplit(oldInputSplit, conf), conf); + if (oldInputSplit != null) { + initialize(getSplit(oldInputSplit, conf), conf); + setTimeZoneConversion(jobConf, ((FileSplit) oldInputSplit).getPath()); + } colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); rbCtx = Utilities.getVectorizedRowBatchCtx(conf); } catch (Throwable e) { @@ -127,16 +134,12 @@ public VectorizedParquetRecordReader( } public void initialize( - InputSplit oldSplit, - JobConf configuration) throws IOException, InterruptedException { - // the oldSplit may be null during the split phase - if (oldSplit == null) { - return; - } + ParquetInputSplit split, + JobConf configuration) throws IOException, InterruptedException { + jobConf = configuration; ParquetMetadata footer; List blocks; - ParquetInputSplit split = (ParquetInputSplit) oldSplit; boolean indexAccess = configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false); this.file = split.getPath(); @@ -287,17 +290,18 @@ private void checkEndOfRowGroup() throws IOException { List columns = requestedSchema.getColumns(); List types = requestedSchema.getFields(); columnReaders = new VectorizedColumnReader[columns.size()]; + String timeZoneId = jobConf.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY); if (!ColumnProjectionUtils.isReadAllColumns(jobConf) && !indexColumnsWanted.isEmpty()) { for (int i = 0; i < types.size(); ++i) { columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i), - pages, requestedSchema.getColumns(), skipTimestampConversion, 0); + pages, requestedSchema.getColumns(), timeZoneId, 0); } } else { for (int i = 0; i < types.size(); ++i) { columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages, - requestedSchema.getColumns(), skipTimestampConversion, 0); + requestedSchema.getColumns(), timeZoneId, 0); } } @@ -326,7 +330,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( Type type, PageReadStore pages, List columnDescriptors, - boolean skipTimestampConversion, + String conversionTimeZone, int depth) throws IOException { List descriptors = getAllColumnDescriptorByType(depth, type, columnDescriptors); @@ -337,7 +341,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( "Failed to find related Parquet column descriptor with type " + type); } else { return new VectorizedPrimitiveColumnReader(descriptors.get(0), - pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); + pages.getPageReader(descriptors.get(0)), conversionTimeZone, type); } case STRUCT: StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; @@ -347,7 +351,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( for (int i = 0; i < fieldTypes.size(); i++) { VectorizedColumnReader r = buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors, - skipTimestampConversion, depth + 1); + conversionTimeZone, depth + 1); if (r != null) { fieldReaders.add(r); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 3d5c6e6..c27e7d9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.ql.io.parquet.vector; +import com.google.common.base.Strings; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; @@ -45,6 +46,8 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Timestamp; +import java.util.Calendar; +import java.util.TimeZone; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; @@ -58,7 +61,7 @@ private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class); - private boolean skipTimestampConversion = false; + private String conversionTimeZone; /** * Total number of values read. @@ -108,13 +111,13 @@ public VectorizedPrimitiveColumnReader( ColumnDescriptor descriptor, PageReader pageReader, - boolean skipTimestampConversion, + String conversionTimeZone, Type type) throws IOException { this.descriptor = descriptor; this.type = type; this.pageReader = pageReader; this.maxDefLevel = descriptor.getMaxDefinitionLevel(); - this.skipTimestampConversion = skipTimestampConversion; + this.conversionTimeZone = conversionTimeZone; DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); if (dictionaryPage != null) { @@ -411,13 +414,20 @@ private void decodeDictionaryIds( } break; case INT96: + final Calendar calendar; + if (Strings.isNullOrEmpty(this.conversionTimeZone)) { + // Local time should be used if no timezone is specified + calendar = Calendar.getInstance(); + } else { + calendar = Calendar.getInstance(TimeZone.getTimeZone(this.conversionTimeZone)); + } for (int i = rowId; i < rowId + num; ++i) { ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer(); buf.order(ByteOrder.LITTLE_ENDIAN); long timeOfDayNanos = buf.getLong(); int julianDay = buf.getInt(); NanoTime nt = new NanoTime(julianDay, timeOfDayNanos); - Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + Timestamp ts = NanoTimeUtils.getTimestamp(nt, calendar); ((TimestampColumnVector) column).set(i, ts); } break; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java index f4621e5..71a78cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.ql.io.parquet.write; import java.util.HashMap; +import java.util.TimeZone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; @@ -31,9 +32,11 @@ public class DataWritableWriteSupport extends WriteSupport { public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema"; + private static final String PARQUET_TIMEZONE_CONVERSION = "parquet.hive.timezone"; private DataWritableWriter writer; private MessageType schema; + private TimeZone timeZone; public static void setSchema(final MessageType schema, final Configuration configuration) { configuration.set(PARQUET_HIVE_SCHEMA, schema.toString()); @@ -43,15 +46,24 @@ public static MessageType getSchema(final Configuration configuration) { return MessageTypeParser.parseMessageType(configuration.get(PARQUET_HIVE_SCHEMA)); } + public static void setTimeZone(final TimeZone timeZone, final Configuration configuration) { + configuration.set(PARQUET_TIMEZONE_CONVERSION, timeZone.getID()); + } + + public static TimeZone getTimeZone(final Configuration configuration) { + return TimeZone.getTimeZone(configuration.get(PARQUET_TIMEZONE_CONVERSION)); + } + @Override public WriteContext init(final Configuration configuration) { schema = getSchema(configuration); + timeZone = getTimeZone(configuration); return new WriteContext(schema, new HashMap()); } @Override public void prepareForWrite(final RecordConsumer recordConsumer) { - writer = new DataWritableWriter(recordConsumer, schema); + writer = new DataWritableWriter(recordConsumer, schema, timeZone); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java index 6b7b50a..a400fa2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java @@ -49,8 +49,10 @@ import java.sql.Date; import java.sql.Timestamp; +import java.util.Calendar; import java.util.List; import java.util.Map; +import java.util.TimeZone; /** * @@ -62,14 +64,16 @@ private static final Logger LOG = LoggerFactory.getLogger(DataWritableWriter.class); protected final RecordConsumer recordConsumer; private final GroupType schema; + private final TimeZone timeZone; /* This writer will be created when writing the first row in order to get information about how to inspect the record data. */ private DataWriter messageWriter; - public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema) { + public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema, final TimeZone timeZone) { this.recordConsumer = recordConsumer; this.schema = schema; + this.timeZone = timeZone; } /** @@ -488,15 +492,17 @@ public void write(Object value) { private class TimestampDataWriter implements DataWriter { private TimestampObjectInspector inspector; + private Calendar calendar; public TimestampDataWriter(TimestampObjectInspector inspector) { this.inspector = inspector; + this.calendar = Calendar.getInstance(timeZone); } @Override public void write(Object value) { Timestamp ts = inspector.getPrimitiveJavaObject(value); - recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary()); + recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, calendar).toBinary()); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java index 934ae9f..74ec728 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java @@ -16,13 +16,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; + import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -45,10 +48,14 @@ import org.apache.parquet.schema.MessageTypeParser; import java.io.UnsupportedEncodingException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.Calendar; import java.util.List; import java.util.Properties; +import java.util.TimeZone; + import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -104,6 +111,10 @@ private void addString(String value) { inOrder.verify(mockRecordConsumer).addBinary(Binary.fromString(value)); } + private void addBinary(Binary value) { + inOrder.verify(mockRecordConsumer).addBinary(value); + } + private void startGroup() { inOrder.verify(mockRecordConsumer).startGroup(); } @@ -136,6 +147,10 @@ private BooleanWritable createBoolean(boolean value) { return new BooleanWritable(value); } + private TimestampWritable createTimestamp(Timestamp value) { + return new TimestampWritable(value); + } + private BytesWritable createString(String value) throws UnsupportedEncodingException { return new BytesWritable(value.getBytes("UTF-8")); } @@ -151,7 +166,7 @@ private ArrayWritable createArray(Writable...values) { private List createHiveColumnsFrom(final String columnNamesStr) { List columnNames; if (columnNamesStr.length() == 0) { - columnNames = new ArrayList(); + columnNames = new ArrayList<>(); } else { columnNames = Arrays.asList(columnNamesStr.split(",")); } @@ -191,12 +206,52 @@ private ParquetHiveRecord getParquetWritable(String columnNames, String columnTy } private void writeParquetRecord(String schema, ParquetHiveRecord record) throws SerDeException { + writeParquetRecord(schema, record, TimeZone.getTimeZone("GMT")); + } + + private void writeParquetRecord(String schema, ParquetHiveRecord record, TimeZone timeZone) throws SerDeException { MessageType fileSchema = MessageTypeParser.parseMessageType(schema); - DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, fileSchema); + DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, fileSchema, timeZone); hiveParquetWriter.write(record); } @Test + public void testTimestampInt96() throws Exception { + String columnNames = "ts"; + String columnTypes = "timestamp"; + + String fileSchema = "message hive_schema {\n" + + " optional int96 ts;\n" + + "}\n"; + + ArrayWritable hiveRecord = createGroup( + createTimestamp(Timestamp.valueOf("2016-01-01 01:01:01")) + ); + + // Write record to Parquet format using CST timezone + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord), TimeZone.getTimeZone("CST")); + + // Verify record was written correctly to Parquet + startMessage(); + startField("ts", 0); + addBinary(NanoTimeUtils.getNanoTime(Timestamp.valueOf("2016-01-01 01:01:01"), + Calendar.getInstance(TimeZone.getTimeZone("CST"))).toBinary()); + endField("ts", 0); + endMessage(); + + // Write record to Parquet format using PST timezone + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord), TimeZone.getTimeZone("PST")); + + // Verify record was written correctly to Parquet + startMessage(); + startField("ts", 0); + addBinary(NanoTimeUtils.getNanoTime(Timestamp.valueOf("2016-01-01 01:01:01"), + Calendar.getInstance(TimeZone.getTimeZone("PST"))).toBinary()); + endField("ts", 0); + endMessage(); + } + + @Test public void testSimpleType() throws Exception { String columnNames = "int,double,boolean,float,string,tinyint,smallint,bigint"; String columnTypes = "int,double,boolean,float,string,tinyint,smallint,bigint"; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 670bfa6..f4f6e88 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -106,7 +106,7 @@ public void testNullSplitForParquetReader() throws Exception { HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); initialVectorizedRowBatchCtx(conf); VectorizedParquetRecordReader reader = - new VectorizedParquetRecordReader((InputSplit)null, new JobConf(conf)); + new VectorizedParquetRecordReader((org.apache.hadoop.mapred.InputSplit)null, new JobConf(conf)); assertFalse(reader.next(reader.createKey(), reader.createValue())); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index f537cee..f2d79cf 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -47,6 +47,7 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -222,7 +223,7 @@ protected VectorizedParquetRecordReader createParquetReader(String schemaString, Job vectorJob = new Job(conf, "read vector"); ParquetInputFormat.setInputPaths(vectorJob, file); ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); - InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + ParquetInputSplit split = (ParquetInputSplit) parquetInputFormat.getSplits(vectorJob).get(0); initialVectorizedRowBatchCtx(conf); return new VectorizedParquetRecordReader(split, new JobConf(conf)); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java new file mode 100644 index 0000000..2344d63 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.convert; + +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.Writable; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; + +public class TestETypeConverter { + + private ConverterParentHelper parent; + private Timestamp ts; + + @Before + public void init() { + parent = new ConverterParentHelper(); + ts = Timestamp.valueOf("2011-01-01 01:01:01.111111111"); + } + /** + * This class helps to compare a Writable value pushed to the ConverterParent class. + */ + private class ConverterParentHelper implements ConverterParent { + private Writable value; + private Map metadata = new HashMap<>(); + + /** + * The set() method is called from within addXXXX() PrimitiveConverter methods. + */ + @Override + public void set(int index, Writable value) { + this.value = value; + } + + @Override + public Map getMetadata() { + return metadata; + } + + public void assertWritableValue(Writable expected) { + assertEquals(expected.getClass(), value.getClass()); + assertEquals("Writable value set to Parent is different than expected", expected, value); + } + } + + private PrimitiveConverter getETypeConverter(ConverterParent parent, PrimitiveTypeName typeName, TypeInfo type) { + return ETypeConverter.getNewConverter(new PrimitiveType(Type.Repetition.REQUIRED, typeName, "field"), 0, parent, type); + } + + @Test + public void testTimestampInt96ConverterLocal() { + PrimitiveConverter converter; + + // Default timezone should be Localtime + converter = getETypeConverter(parent, PrimitiveTypeName.INT96, TypeInfoFactory.timestampTypeInfo); + converter.addBinary(NanoTimeUtils.getNanoTime(ts, Calendar.getInstance()).toBinary()); + parent.assertWritableValue(new TimestampWritable(ts)); + } + + @Test + public void testTimestampInt96ConverterGMT() { + PrimitiveConverter converter; + + parent.metadata.put(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, "GMT"); + converter = getETypeConverter(parent, PrimitiveTypeName.INT96, TypeInfoFactory.timestampTypeInfo); + converter.addBinary(NanoTimeUtils.getNanoTime(ts, + Calendar.getInstance(TimeZone.getTimeZone("GMT"))).toBinary()); + parent.assertWritableValue(new TimestampWritable(ts)); + + } + + @Test + public void testTimestampInt96ConverterChicago() { + PrimitiveConverter converter; + + parent.metadata.put(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, "America/Chicago"); + converter = getETypeConverter(parent, PrimitiveTypeName.INT96, TypeInfoFactory.timestampTypeInfo); + converter.addBinary(NanoTimeUtils.getNanoTime(ts, + Calendar.getInstance(TimeZone.getTimeZone("America/Chicago"))).toBinary()); + parent.assertWritableValue(new TimestampWritable(ts)); + } + + @Test + public void testTimestampInt96ConverterEtc() { + PrimitiveConverter converter; + + parent.metadata.put(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, "Etc/GMT-12"); + converter = getETypeConverter(parent, PrimitiveTypeName.INT96, TypeInfoFactory.timestampTypeInfo); + converter.addBinary(NanoTimeUtils.getNanoTime(ts, + Calendar.getInstance(TimeZone.getTimeZone("Etc/GMT-12"))).toBinary()); + parent.assertWritableValue(new TimestampWritable(ts)); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/timestamp/TestNanoTimeUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/timestamp/TestNanoTimeUtils.java new file mode 100644 index 0000000..491c693 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/timestamp/TestNanoTimeUtils.java @@ -0,0 +1,199 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.timestamp; + +import org.junit.Assert; +import org.junit.Test; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.TimeZone; + +public class TestNanoTimeUtils { + + // 3:34:10.101010101 PM on 1 January 2000: + public static final int JAN_1_2000 = 2451545; // according to Wikipedia + public static final long PM_3_34_10_101010101 = + ((15L*60L+34L)*60L+10L)*1000000000L + 101010101L; + public static final NanoTime KNOWN_TIME = new NanoTime( + JAN_1_2000, PM_3_34_10_101010101); + + public static final long KNOWN_IN_MILLIS = 946740850101L; // currentmillis.com + + public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + public static final TimeZone PST = TimeZone.getTimeZone("PST"); + public static final TimeZone CST = TimeZone.getTimeZone("CST"); + public static final TimeZone PLUS_6 = TimeZone.getTimeZone("GMT+6"); + public static final TimeZone MINUS_6 = TimeZone.getTimeZone("GMT-6"); + + // From Spark's NanoTime implementation + public static final int JULIAN_DAY_OF_EPOCH = 2440588; + public static final long SECONDS_PER_DAY = 60 * 60 * 24L; + public static final long MICROS_PER_SECOND = 1000L * 1000L; + + /** + * Returns the number of microseconds since epoch from Julian day + * and nanoseconds in a day + * + * This is Spark's NanoTime implementation + */ + public long fromJulianDay(int julianDay, long nanoseconds) { + // use Long to avoid rounding errors + long seconds = (((long) julianDay) - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + return seconds * MICROS_PER_SECOND + nanoseconds / 1000L; + } + + /** + * Returns a Calendar from number of micros since epoch. + * + * This is a reliable conversion from micros since epoch to local time. + */ + public Calendar toCalendar(long timestamp_us, TimeZone zone) { + Calendar cal = Calendar.getInstance(zone); + cal.setTimeInMillis(timestamp_us / 1000L); + return cal; + } + + @Test + public void testFromJulianDay() { + Assert.assertEquals(KNOWN_IN_MILLIS, + fromJulianDay(JAN_1_2000, PM_3_34_10_101010101) / 1000L); + } + + @Test + public void testKnownTimestampWithFromJulianDay() { + Calendar known = toCalendar(fromJulianDay( + JAN_1_2000, PM_3_34_10_101010101), UTC); + Assert.assertEquals(2000, known.get(Calendar.YEAR)); + Assert.assertEquals(Calendar.JANUARY, known.get(Calendar.MONTH)); + Assert.assertEquals(1, known.get(Calendar.DAY_OF_MONTH)); + Assert.assertEquals(15, known.get(Calendar.HOUR_OF_DAY)); + Assert.assertEquals(34, known.get(Calendar.MINUTE)); + Assert.assertEquals(10, known.get(Calendar.SECOND)); + + // can't validate nanos because Calendar calculations are done in millis + } + + @Test + public void testKnownTimestampWithoutConversion() { + // a UTC calendar will produce the same values as not converting + Calendar calendar = toCalendar(fromJulianDay( + JAN_1_2000, PM_3_34_10_101010101), UTC); + + Timestamp known = NanoTimeUtils.getTimestamp( + KNOWN_TIME, true /* skip conversion from UTC to local */ ); + + Assert.assertEquals(calendar.get(Calendar.YEAR) - 1900, known.getYear()); + Assert.assertEquals(calendar.get(Calendar.MONTH), known.getMonth()); + Assert.assertEquals(calendar.get(Calendar.DAY_OF_MONTH), known.getDate()); + Assert.assertEquals(calendar.get(Calendar.HOUR_OF_DAY), known.getHours()); + Assert.assertEquals(calendar.get(Calendar.MINUTE), known.getMinutes()); + Assert.assertEquals(calendar.get(Calendar.SECOND), known.getSeconds()); + Assert.assertEquals(101010101, known.getNanos()); + + NanoTime actualJD = NanoTimeUtils.getNanoTime(known, true); + + Assert.assertEquals(actualJD.getJulianDay(), JAN_1_2000); + Assert.assertEquals(actualJD.getTimeOfDayNanos(), PM_3_34_10_101010101); + } + + @Test + public void testKnownTimestampWithConversion() { + // a PST calendar will produce the same values when converting to local + Calendar calendar = toCalendar(fromJulianDay( + JAN_1_2000, PM_3_34_10_101010101), PST); // CHANGE ME IF LOCAL IS NOT PST + + Timestamp known = NanoTimeUtils.getTimestamp( + KNOWN_TIME, false /* do not skip conversion from UTC to local */ ); + + Assert.assertEquals(calendar.get(Calendar.YEAR) - 1900, known.getYear()); + Assert.assertEquals(calendar.get(Calendar.MONTH), known.getMonth()); + Assert.assertEquals(calendar.get(Calendar.DAY_OF_MONTH), known.getDate()); + Assert.assertEquals(calendar.get(Calendar.HOUR_OF_DAY), known.getHours()); + Assert.assertEquals(calendar.get(Calendar.MINUTE), known.getMinutes()); + Assert.assertEquals(calendar.get(Calendar.SECOND), known.getSeconds()); + Assert.assertEquals(101010101, known.getNanos()); + + NanoTime actualJD = NanoTimeUtils.getNanoTime(known, false); + + Assert.assertEquals(actualJD.getJulianDay(), JAN_1_2000); + Assert.assertEquals(actualJD.getTimeOfDayNanos(), PM_3_34_10_101010101); + } + + @Test + public void testKnownWithZoneArgumentUTC() { // EXPECTED BEHAVIOR + // the UTC calendar should match the alternative implementation with UTC + Calendar calendar = toCalendar(fromJulianDay( + JAN_1_2000, PM_3_34_10_101010101), UTC); + + Timestamp known = NanoTimeUtils.getTimestamp( + KNOWN_TIME, Calendar.getInstance(UTC)); + + Assert.assertEquals(calendar.get(Calendar.YEAR) - 1900, known.getYear()); + Assert.assertEquals(calendar.get(Calendar.MONTH), known.getMonth()); + Assert.assertEquals(calendar.get(Calendar.DAY_OF_MONTH), known.getDate()); + Assert.assertEquals(calendar.get(Calendar.HOUR_OF_DAY), known.getHours()); + Assert.assertEquals(calendar.get(Calendar.MINUTE), known.getMinutes()); + Assert.assertEquals(calendar.get(Calendar.SECOND), known.getSeconds()); + Assert.assertEquals(101010101, known.getNanos()); + + NanoTime actualJD = NanoTimeUtils.getNanoTime(known, Calendar.getInstance(UTC)); + + Assert.assertEquals(actualJD.getJulianDay(), JAN_1_2000); + Assert.assertEquals(actualJD.getTimeOfDayNanos(), PM_3_34_10_101010101); + } + + @Test + public void testKnownWithZoneArgumentGMTP6() { + Calendar calendar = toCalendar(fromJulianDay( + JAN_1_2000, PM_3_34_10_101010101), PLUS_6); + + Timestamp known = NanoTimeUtils.getTimestamp( + KNOWN_TIME, Calendar.getInstance(PLUS_6)); + + Assert.assertEquals(calendar.get(Calendar.YEAR) - 1900, known.getYear()); + Assert.assertEquals(calendar.get(Calendar.MONTH), known.getMonth()); + Assert.assertEquals(calendar.get(Calendar.DAY_OF_MONTH), known.getDate()); + Assert.assertEquals(calendar.get(Calendar.HOUR_OF_DAY), known.getHours()); + Assert.assertEquals(calendar.get(Calendar.MINUTE), known.getMinutes()); + Assert.assertEquals(calendar.get(Calendar.SECOND), known.getSeconds()); + Assert.assertEquals(101010101, known.getNanos()); + + NanoTime actualJD = NanoTimeUtils.getNanoTime(known, Calendar.getInstance(PLUS_6)); + + Assert.assertEquals(actualJD.getJulianDay(), JAN_1_2000); + Assert.assertEquals(actualJD.getTimeOfDayNanos(), PM_3_34_10_101010101); + } + + @Test + public void testKnownWithZoneArgumentGMTM6() { + Calendar calendar = toCalendar(fromJulianDay( + JAN_1_2000, PM_3_34_10_101010101), MINUS_6); + + Timestamp known = NanoTimeUtils.getTimestamp( + KNOWN_TIME, Calendar.getInstance(MINUS_6)); + + Assert.assertEquals(calendar.get(Calendar.YEAR) - 1900, known.getYear()); + Assert.assertEquals(calendar.get(Calendar.MONTH), known.getMonth()); + Assert.assertEquals(calendar.get(Calendar.DAY_OF_MONTH), known.getDate()); + Assert.assertEquals(calendar.get(Calendar.HOUR_OF_DAY), known.getHours()); + Assert.assertEquals(calendar.get(Calendar.MINUTE), known.getMinutes()); + Assert.assertEquals(calendar.get(Calendar.SECOND), known.getSeconds()); + Assert.assertEquals(101010101, known.getNanos()); + + NanoTime actualJD = NanoTimeUtils.getNanoTime(known, Calendar.getInstance(MINUS_6)); + + Assert.assertEquals(actualJD.getJulianDay(), JAN_1_2000); + Assert.assertEquals(actualJD.getTimeOfDayNanos(), PM_3_34_10_101010101); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/timestamp/TestParquetTimestampConverter.java similarity index 64% rename from ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java rename to ql/src/test/org/apache/hadoop/hive/ql/io/parquet/timestamp/TestParquetTimestampConverter.java index ec6def5..9e70148 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/serde/TestParquetTimestampUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/timestamp/TestParquetTimestampConverter.java @@ -11,27 +11,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.io.parquet.serde; +package org.apache.hadoop.hive.ql.io.parquet.timestamp; import java.sql.Timestamp; import java.util.Calendar; -import java.util.Date; import java.util.GregorianCalendar; import java.util.TimeZone; import java.util.concurrent.TimeUnit; -import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; -import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; - - /** * Tests util-libraries used for parquet-timestamp. */ -public class TestParquetTimestampUtils extends TestCase { +public class TestParquetTimestampConverter extends TestCase { public void testJulianDay() { //check if May 23, 1968 is Julian Day 2440000 @@ -44,10 +38,10 @@ public void testJulianDay() { Timestamp ts = new Timestamp(cal.getTimeInMillis()); NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); - Assert.assertEquals(nt.getJulianDay(), 2440000); + assertEquals(nt.getJulianDay(), 2440000); Timestamp tsFetched = NanoTimeUtils.getTimestamp(nt, false); - Assert.assertEquals(tsFetched, ts); + assertEquals(tsFetched, ts); //check if 30 Julian Days between Jan 1, 2005 and Jan 31, 2005. Calendar cal1 = Calendar.getInstance(); @@ -61,7 +55,7 @@ public void testJulianDay() { NanoTime nt1 = NanoTimeUtils.getNanoTime(ts1, false); Timestamp ts1Fetched = NanoTimeUtils.getTimestamp(nt1, false); - Assert.assertEquals(ts1Fetched, ts1); + assertEquals(ts1Fetched, ts1); Calendar cal2 = Calendar.getInstance(); cal2.set(Calendar.YEAR, 2005); @@ -74,8 +68,8 @@ public void testJulianDay() { NanoTime nt2 = NanoTimeUtils.getNanoTime(ts2, false); Timestamp ts2Fetched = NanoTimeUtils.getTimestamp(nt2, false); - Assert.assertEquals(ts2Fetched, ts2); - Assert.assertEquals(nt2.getJulianDay() - nt1.getJulianDay(), 30); + assertEquals(ts2Fetched, ts2); + assertEquals(nt2.getJulianDay() - nt1.getJulianDay(), 30); //check if 1464305 Julian Days between Jan 1, 2005 BC and Jan 31, 2005. cal1 = Calendar.getInstance(); @@ -90,7 +84,7 @@ public void testJulianDay() { nt1 = NanoTimeUtils.getNanoTime(ts1, false); ts1Fetched = NanoTimeUtils.getTimestamp(nt1, false); - Assert.assertEquals(ts1Fetched, ts1); + assertEquals(ts1Fetched, ts1); cal2 = Calendar.getInstance(); cal2.set(Calendar.YEAR, 2005); @@ -103,8 +97,8 @@ public void testJulianDay() { nt2 = NanoTimeUtils.getNanoTime(ts2, false); ts2Fetched = NanoTimeUtils.getTimestamp(nt2, false); - Assert.assertEquals(ts2Fetched, ts2); - Assert.assertEquals(nt2.getJulianDay() - nt1.getJulianDay(), 1464305); + assertEquals(ts2Fetched, ts2); + assertEquals(nt2.getJulianDay() - nt1.getJulianDay(), 1464305); } public void testNanos() { @@ -122,7 +116,7 @@ public void testNanos() { //(1*60*60 + 1*60 + 1) * 10e9 + 1 NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); - Assert.assertEquals(nt.getTimeOfDayNanos(), 3661000000001L); + assertEquals(nt.getTimeOfDayNanos(), 3661000000001L); //case 2: 23:59:59.999999999 cal = Calendar.getInstance(); @@ -138,7 +132,7 @@ public void testNanos() { //(23*60*60 + 59*60 + 59)*10e9 + 999999999 nt = NanoTimeUtils.getNanoTime(ts, false); - Assert.assertEquals(nt.getTimeOfDayNanos(), 86399999999999L); + assertEquals(nt.getTimeOfDayNanos(), 86399999999999L); //case 3: verify the difference. Calendar cal2 = Calendar.getInstance(); @@ -166,12 +160,12 @@ public void testNanos() { NanoTime n2 = NanoTimeUtils.getNanoTime(ts2, false); NanoTime n1 = NanoTimeUtils.getNanoTime(ts1, false); - Assert.assertEquals(n2.getTimeOfDayNanos() - n1.getTimeOfDayNanos(), 600000000009L); + assertEquals(n2.getTimeOfDayNanos() - n1.getTimeOfDayNanos(), 600000000009L); NanoTime n3 = new NanoTime(n1.getJulianDay() - 1, n1.getTimeOfDayNanos() + TimeUnit.DAYS.toNanos(1)); - Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); + assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); n3 = new NanoTime(n1.getJulianDay() + 3, n1.getTimeOfDayNanos() - TimeUnit.DAYS.toNanos(3)); - Assert.assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); + assertEquals(ts1, NanoTimeUtils.getTimestamp(n3, false)); } public void testTimezone() { @@ -195,69 +189,76 @@ public void testTimezone() { */ NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); long timeOfDayNanos = nt.getTimeOfDayNanos(); - Assert.assertTrue(timeOfDayNanos == 61000000001L || timeOfDayNanos == 3661000000001L); + assertTrue(timeOfDayNanos == 61000000001L || timeOfDayNanos == 3661000000001L); //in both cases, this will be the next day in GMT - Assert.assertEquals(nt.getJulianDay(), 2440001); - } - - public void testTimezoneValues() { - valueTest(false); - } - - public void testTimezonelessValues() { - valueTest(true); + assertEquals(nt.getJulianDay(), 2440001); } public void testTimezoneless() { Timestamp ts1 = Timestamp.valueOf("2011-01-01 00:30:30.111111111"); NanoTime nt1 = NanoTimeUtils.getNanoTime(ts1, true); - Assert.assertEquals(nt1.getJulianDay(), 2455563); - Assert.assertEquals(nt1.getTimeOfDayNanos(), 1830111111111L); + assertEquals(nt1.getJulianDay(), 2455563); + assertEquals(nt1.getTimeOfDayNanos(), 1830111111111L); Timestamp ts1Fetched = NanoTimeUtils.getTimestamp(nt1, true); - Assert.assertEquals(ts1Fetched.toString(), ts1.toString()); + assertEquals(ts1Fetched.toString(), ts1.toString()); Timestamp ts2 = Timestamp.valueOf("2011-02-02 08:30:30.222222222"); NanoTime nt2 = NanoTimeUtils.getNanoTime(ts2, true); - Assert.assertEquals(nt2.getJulianDay(), 2455595); - Assert.assertEquals(nt2.getTimeOfDayNanos(), 30630222222222L); + assertEquals(nt2.getJulianDay(), 2455595); + assertEquals(nt2.getTimeOfDayNanos(), 30630222222222L); Timestamp ts2Fetched = NanoTimeUtils.getTimestamp(nt2, true); - Assert.assertEquals(ts2Fetched.toString(), ts2.toString()); + assertEquals(ts2Fetched.toString(), ts2.toString()); + } + + public void testTimezoneValues() { + // Test with different timezone IDs strings + valueTest(Calendar.getInstance(TimeZone.getTimeZone("GMT"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("CST"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("CST"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("PST"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("UTC"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("America/Los_Angeles"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("US/Pacific"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("Etc/GMT+7"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("Etc/GMT-1"))); + valueTest(Calendar.getInstance(TimeZone.getTimeZone("Mexico/General"))); + valueTest(Calendar.getInstance(TimeZone.getDefault())); } - private void valueTest(boolean local) { + private void valueTest(Calendar calendar) { //exercise a broad range of timestamps close to the present. - verifyTsString("2011-01-01 01:01:01.111111111", local); - verifyTsString("2012-02-02 02:02:02.222222222", local); - verifyTsString("2013-03-03 03:03:03.333333333", local); - verifyTsString("2014-04-04 04:04:04.444444444", local); - verifyTsString("2015-05-05 05:05:05.555555555", local); - verifyTsString("2016-06-06 06:06:06.666666666", local); - verifyTsString("2017-07-07 07:07:07.777777777", local); - verifyTsString("2018-08-08 08:08:08.888888888", local); - verifyTsString("2019-09-09 09:09:09.999999999", local); - verifyTsString("2020-10-10 10:10:10.101010101", local); - verifyTsString("2021-11-11 11:11:11.111111111", local); - verifyTsString("2022-12-12 12:12:12.121212121", local); - verifyTsString("2023-01-02 13:13:13.131313131", local); - verifyTsString("2024-02-02 14:14:14.141414141", local); - verifyTsString("2025-03-03 15:15:15.151515151", local); - verifyTsString("2026-04-04 16:16:16.161616161", local); - verifyTsString("2027-05-05 17:17:17.171717171", local); - verifyTsString("2028-06-06 18:18:18.181818181", local); - verifyTsString("2029-07-07 19:19:19.191919191", local); - verifyTsString("2030-08-08 20:20:20.202020202", local); - verifyTsString("2031-09-09 21:21:21.212121212", local); + verifyTsString("2011-01-01 01:01:01.111111111", calendar); + verifyTsString("2012-02-02 02:02:02.222222222", calendar); + verifyTsString("2013-03-03 03:03:03.333333333", calendar); + verifyTsString("2014-04-04 04:04:04.444444444", calendar); + verifyTsString("2015-05-05 05:05:05.555555555", calendar); + verifyTsString("2016-06-06 06:06:06.666666666", calendar); + verifyTsString("2017-07-07 07:07:07.777777777", calendar); + verifyTsString("2018-08-08 08:08:08.888888888", calendar); + verifyTsString("2019-09-09 09:09:09.999999999", calendar); + verifyTsString("2020-10-10 10:10:10.101010101", calendar); + verifyTsString("2021-11-11 11:11:11.111111111", calendar); + verifyTsString("2022-12-12 12:12:12.121212121", calendar); + verifyTsString("2023-01-02 13:13:13.131313131", calendar); + verifyTsString("2024-02-02 14:14:14.141414141", calendar); + verifyTsString("2025-03-03 15:15:15.151515151", calendar); + verifyTsString("2026-04-04 16:16:16.161616161", calendar); + verifyTsString("2027-05-05 17:17:17.171717171", calendar); + verifyTsString("2028-06-06 18:18:18.181818181", calendar); + verifyTsString("2029-07-07 19:19:19.191919191", calendar); + verifyTsString("2030-08-08 20:20:20.202020202", calendar); + verifyTsString("2031-09-09 21:21:21.212121212", calendar); //test some extreme cases. - verifyTsString("9999-09-09 09:09:09.999999999", local); - verifyTsString("0001-01-01 00:00:00.0", local); + verifyTsString("9999-09-09 09:09:09.999999999", calendar); + verifyTsString("0001-01-01 00:00:00.0", calendar); } - private void verifyTsString(String tsString, boolean local) { + private void verifyTsString(String tsString, Calendar calendar) { Timestamp ts = Timestamp.valueOf(tsString); - NanoTime nt = NanoTimeUtils.getNanoTime(ts, local); - Timestamp tsFetched = NanoTimeUtils.getTimestamp(nt, local); - Assert.assertEquals(tsString, tsFetched.toString()); + NanoTime nt = NanoTimeUtils.getNanoTime(ts, calendar); + Timestamp tsFetched = NanoTimeUtils.getTimestamp(nt, calendar); + assertEquals(tsString, tsFetched.toString()); } } diff --git ql/src/test/queries/clientpositive/parquet_int96_timestamp.q ql/src/test/queries/clientpositive/parquet_int96_timestamp.q new file mode 100644 index 0000000..e5eb610 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_int96_timestamp.q @@ -0,0 +1,64 @@ +create table dummy (id int); +insert into table dummy values (1); + +set parquet.mr.int96.enable.utc.write.zone=true; +set hive.parquet.timestamp.skip.conversion=false; + +-- read/write timestamps using UTC as default write zone +create table timestamps (ts timestamp) stored as parquet; +insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1; +select * from timestamps; +describe formatted timestamps; +drop table timestamps; + +-- table property is set. the default should not override it +create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='PST'); +insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1; +select * from timestamps; +describe formatted timestamps; +drop table timestamps; + +set parquet.mr.int96.enable.utc.write.zone=false; + +-- read/write timestamps using local timezone +create table timestamps (ts timestamp) stored as parquet; +insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1; +select * from timestamps; +describe formatted timestamps; +drop table timestamps; + +-- read/write timestamps with timezone specified in table properties +create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='CST'); +insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1; +select * from timestamps; +describe formatted timestamps; +drop table timestamps; + +-- read/write timestamps with timezone specified in table properties +create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='PST'); +insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1; +select * from timestamps; +describe formatted timestamps; +drop table timestamps; + +-- read timestamps written by Impala +create table timestamps (ts timestamp) stored as parquet; +load data local inpath '../../data/files/impala_int96_timestamp.parq' overwrite into table timestamps; +select * from timestamps; +drop table timestamps; + +-- read timestamps written by Impala when table timezone is set (Impala timestamp should not be converted) +create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='GMT+10'); +load data local inpath '../../data/files/impala_int96_timestamp.parq' overwrite into table timestamps; +select * from timestamps; +drop table timestamps; + +-- CREATE TABLE LIKE will copy the timezone property +create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='GMT+10'); +create table timestamps2 like timestamps; +describe formatted timestamps; +describe formatted timestamps2; +drop table timestamps; +drop table timestamps2; + +drop table if exists dummy; \ No newline at end of file diff --git ql/src/test/queries/clientpositive/parquet_timestamp_conversion.q ql/src/test/queries/clientpositive/parquet_timestamp_conversion.q new file mode 100644 index 0000000..b06a310 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_timestamp_conversion.q @@ -0,0 +1,13 @@ +set hive.parquet.timestamp.skip.conversion=true; + +create table timestamps (ts timestamp) stored as parquet; +insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1; +select * from timestamps; +drop table timestamps; + +set hive.parquet.timestamp.skip.conversion=false; + +create table timestamps (ts timestamp) stored as parquet; +insert into table timestamps select cast('2017-01-01 01:01:01' as timestamp) limit 1; +select * from timestamps; +drop table timestamps; \ No newline at end of file diff --git ql/src/test/results/clientpositive/parquet_int96_timestamp.q.out ql/src/test/results/clientpositive/parquet_int96_timestamp.q.out new file mode 100644 index 0000000..b9a3664 --- /dev/null +++ ql/src/test/results/clientpositive/parquet_int96_timestamp.q.out @@ -0,0 +1,535 @@ +PREHOOK: query: create table dummy (id int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dummy +POSTHOOK: query: create table dummy (id int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dummy +PREHOOK: query: insert into table dummy values (1) +PREHOOK: type: QUERY +PREHOOK: Output: default@dummy +POSTHOOK: query: insert into table dummy values (1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@dummy +POSTHOOK: Lineage: dummy.id EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@timestamps +POSTHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@timestamps +POSTHOOK: Lineage: timestamps.ts EXPRESSION [] +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: describe formatted timestamps +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@timestamps +POSTHOOK: query: describe formatted timestamps +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@timestamps +# col_name data_type comment + +ts timestamp + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + numFiles 1 + numRows 1 + parquet.mr.int96.write.zone UTC + rawDataSize 1 + totalSize 272 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='PST') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='PST') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@timestamps +POSTHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@timestamps +POSTHOOK: Lineage: timestamps.ts EXPRESSION [] +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: describe formatted timestamps +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@timestamps +POSTHOOK: query: describe formatted timestamps +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@timestamps +# col_name data_type comment + +ts timestamp + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + numFiles 1 + numRows 1 + parquet.mr.int96.write.zone PST + rawDataSize 1 + totalSize 272 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@timestamps +POSTHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@timestamps +POSTHOOK: Lineage: timestamps.ts EXPRESSION [] +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: describe formatted timestamps +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@timestamps +POSTHOOK: query: describe formatted timestamps +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@timestamps +# col_name data_type comment + +ts timestamp + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + numFiles 1 + numRows 1 + rawDataSize 1 + totalSize 272 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='CST') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='CST') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@timestamps +POSTHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@timestamps +POSTHOOK: Lineage: timestamps.ts EXPRESSION [] +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: describe formatted timestamps +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@timestamps +POSTHOOK: query: describe formatted timestamps +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@timestamps +# col_name data_type comment + +ts timestamp + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + numFiles 1 + numRows 1 + parquet.mr.int96.write.zone CST + rawDataSize 1 + totalSize 272 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='PST') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='PST') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@timestamps +POSTHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@timestamps +POSTHOOK: Lineage: timestamps.ts EXPRESSION [] +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: describe formatted timestamps +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@timestamps +POSTHOOK: query: describe formatted timestamps +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@timestamps +# col_name data_type comment + +ts timestamp + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + numFiles 1 + numRows 1 + parquet.mr.int96.write.zone PST + rawDataSize 1 + totalSize 272 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: load data local inpath '../../data/files/impala_int96_timestamp.parq' overwrite into table timestamps +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@timestamps +POSTHOOK: query: load data local inpath '../../data/files/impala_int96_timestamp.parq' overwrite into table timestamps +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@timestamps +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='GMT+10') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='GMT+10') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: load data local inpath '../../data/files/impala_int96_timestamp.parq' overwrite into table timestamps +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@timestamps +POSTHOOK: query: load data local inpath '../../data/files/impala_int96_timestamp.parq' overwrite into table timestamps +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@timestamps +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='GMT+10') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet tblproperties('parquet.mr.int96.write.zone'='GMT+10') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps2 like timestamps +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps2 +POSTHOOK: query: create table timestamps2 like timestamps +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps2 +PREHOOK: query: describe formatted timestamps +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@timestamps +POSTHOOK: query: describe formatted timestamps +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@timestamps +# col_name data_type comment + +ts timestamp + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + numFiles 0 + numRows 0 + parquet.mr.int96.write.zone GMT+10 + rawDataSize 0 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: describe formatted timestamps2 +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@timestamps2 +POSTHOOK: query: describe formatted timestamps2 +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@timestamps2 +# col_name data_type comment + +ts timestamp + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + numFiles 0 + numRows 0 + parquet.mr.int96.write.zone GMT+10 + rawDataSize 0 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: drop table timestamps2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps2 +PREHOOK: Output: default@timestamps2 +POSTHOOK: query: drop table timestamps2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps2 +POSTHOOK: Output: default@timestamps2 +PREHOOK: query: drop table if exists dummy +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dummy +PREHOOK: Output: default@dummy +POSTHOOK: query: drop table if exists dummy +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dummy +POSTHOOK: Output: default@dummy diff --git ql/src/test/results/clientpositive/parquet_timestamp_conversion.q.out ql/src/test/results/clientpositive/parquet_timestamp_conversion.q.out new file mode 100644 index 0000000..dc31cbe --- /dev/null +++ ql/src/test/results/clientpositive/parquet_timestamp_conversion.q.out @@ -0,0 +1,68 @@ +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@timestamps +POSTHOOK: query: insert into table timestamps select cast('2016-01-01 01:01:01' as timestamp) limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@timestamps +POSTHOOK: Lineage: timestamps.ts EXPRESSION [] +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2016-01-01 01:01:01 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps +PREHOOK: query: create table timestamps (ts timestamp) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@timestamps +POSTHOOK: query: create table timestamps (ts timestamp) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@timestamps +PREHOOK: query: insert into table timestamps select cast('2017-01-01 01:01:01' as timestamp) limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@timestamps +POSTHOOK: query: insert into table timestamps select cast('2017-01-01 01:01:01' as timestamp) limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@timestamps +POSTHOOK: Lineage: timestamps.ts EXPRESSION [] +PREHOOK: query: select * from timestamps +PREHOOK: type: QUERY +PREHOOK: Input: default@timestamps +#### A masked pattern was here #### +POSTHOOK: query: select * from timestamps +POSTHOOK: type: QUERY +POSTHOOK: Input: default@timestamps +#### A masked pattern was here #### +2017-01-01 01:01:01 +PREHOOK: query: drop table timestamps +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@timestamps +PREHOOK: Output: default@timestamps +POSTHOOK: query: drop table timestamps +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@timestamps +POSTHOOK: Output: default@timestamps