diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9d9fdbfae9..37a0a6d3a2 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1976,6 +1976,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION("hive.parquet.timestamp.skip.conversion", false, "Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversion" + "on reading parquet files from other tools"), + HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION("hive.avro.timestamp.skip.conversion", false, + "Some older Hive implementations (pre-3.1) wrote Avro timestamps in a UTC-normalized" + + "manner, while from version 3.1 until now Hive wrote time zone agnostic timestamps. " + + "Setting this flag to true will treat legacy timestamps as time zone agnostic. Setting " + + "it to false will treat legacy timestamps as UTC-normalized. This flag will not affect " + + "timestamps written after this change."), HIVE_INT_TIMESTAMP_CONVERSION_IN_SECONDS("hive.int.timestamp.conversion.in.seconds", false, "Boolean/tinyint/smallint/int/bigint value is interpreted as milliseconds during the timestamp conversion.\n" + "Set this flag to true to interpret the value as seconds to be consistent with float/double." ), diff --git data/files/avro_historical_timestamp_legacy.avro data/files/avro_historical_timestamp_legacy.avro new file mode 100644 index 0000000000000000000000000000000000000000..97accd43aabc550f857628d3dfebdf52a8cc0fc8 GIT binary patch literal 216 zcmeZI%3@>^ODrqO*DFrWNX<>$##F6TQdy9yWTjM;nw(#hqNJmgmzWFUCqkLU1&PTZ zeoAUuVrfnZP&6$wH7BK5$tt=UrnaOQC=4+tS}CtICr3#KZf;I~UOG@VCqF$iIWZ>$ zVp&OMZfbE!Vs3$MZe~tSX0cLjtPb1;E077bv9%0-yU!em-7EbhvS#kX;vd(WnfRFY My}bIeAqrgr0Q06&VgLXD literal 0 HcmV?d00001 diff --git data/files/avro_historical_timestamp_new.avro data/files/avro_historical_timestamp_new.avro new file mode 100644 index 0000000000000000000000000000000000000000..fe01e5419dc969d4b6f12784455d6c965f3dc7a6 GIT binary patch literal 245 zcmeZI%3@>@ODrqO*DFrWNX<>$##F6TQdy9yWTjM;nw(#hqNJmgmzWFUCqkLU1&PTZ zeoAUuVrfnZP&6$wH7BK5$tt=UrnaOQC=4+tS}CtICr3#KZf;I~UOG@VCqF$iIWZ>$ zVp&OMZfbE!Vs3$MZe~tSX0cLjtPb1;E077bv9${2MVTe3MS38`dR6&(sba3BMfnA( m`a$`*sSLS-*A>}q(;@|uW;M-oH(bcX$F%R|)t3!X=n?=_pH(0L literal 0 HcmV?d00001 diff --git ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java index 59d3bba534..be7d8b7ca1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java @@ -24,12 +24,14 @@ import java.io.IOException; import java.util.Properties; +import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -76,6 +78,8 @@ dfw.setCodec(factory); } + // add writer.time.zone property to file metadata + dfw.setMeta(AvroSerDe.WRITER_TIME_ZONE, TimeZone.getDefault().toZoneId().toString()); dfw.create(schema, path.getFileSystem(jobConf).create(path)); return new AvroGenericRecordWriter(dfw); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java index b2bbab9876..1927e0e6e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java @@ -19,7 +19,10 @@ import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.rmi.server.UID; +import java.time.DateTimeException; +import java.time.ZoneId; import java.util.Map; import java.util.Properties; @@ -30,6 +33,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.FsInput; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -59,6 +63,7 @@ final private org.apache.avro.file.FileReader reader; final private long start; final private long stop; + private ZoneId writerTimezone; protected JobConf jobConf; final private boolean isEmptyInput; /** @@ -95,6 +100,8 @@ public AvroGenericRecordReader(JobConf job, FileSplit split, Reporter reporter) } this.stop = split.getStart() + split.getLength(); this.recordReaderID = new UID(); + + this.writerTimezone = extractWriterTimezoneFromMetadata(job, split, gdr); } /** @@ -142,6 +149,28 @@ private Schema getSchema(JobConf job, FileSplit split) throws AvroSerdeException return null; } + private ZoneId extractWriterTimezoneFromMetadata(JobConf job, FileSplit split, + GenericDatumReader gdr) throws IOException { + if (job == null || gdr == null || split == null || split.getPath() == null) { + return null; + } + try { + DataFileReader dataFileReader = + new DataFileReader(new FsInput(split.getPath(), job), gdr); + if (dataFileReader.getMeta(AvroSerDe.WRITER_TIME_ZONE) != null) { + try { + return ZoneId.of(new String(dataFileReader.getMeta(AvroSerDe.WRITER_TIME_ZONE), + StandardCharsets.UTF_8)); + } catch (DateTimeException e) { + throw new RuntimeException("Can't parse writer time zone stored in file metadata", e); + } + } + } catch (IOException e) { + // Can't access metadata, carry on. + } + return null; + } + private boolean pathIsInPartition(Path split, Path partitionPath) { boolean schemeless = split.toUri().getScheme() == null; if (schemeless) { @@ -174,7 +203,7 @@ public NullWritable createKey() { @Override public AvroGenericRecordWritable createValue() { - return new AvroGenericRecordWritable(); + return new AvroGenericRecordWritable(writerTimezone); } @Override diff --git ql/src/test/queries/clientpositive/avro_historical_timestamp.q ql/src/test/queries/clientpositive/avro_historical_timestamp.q new file mode 100644 index 0000000000..6d2e1a7316 --- /dev/null +++ ql/src/test/queries/clientpositive/avro_historical_timestamp.q @@ -0,0 +1,19 @@ +--These files were created by inserting timestamp '2019-01-01 00:30:30.111111111' into column (ts timestamp) where +--writer time zone is Europe/Rome. + +--older writer: time zone dependent behavior. convert to reader time zone +create table legacy_table (ts timestamp) stored as avro; + +load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table; + +select * from legacy_table; +--read legacy timestamps as time zone agnostic +set hive.avro.timestamp.skip.conversion=true; +select * from legacy_table; + +--newer writer: time zone agnostic behavior. convert to writer time zone (US/Pacific) +create table new_table (ts timestamp) stored as avro; + +load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table; + +select * from new_table; \ No newline at end of file diff --git ql/src/test/results/clientpositive/avro_historical_timestamp.q.out ql/src/test/results/clientpositive/avro_historical_timestamp.q.out new file mode 100644 index 0000000000..ca4e84e615 --- /dev/null +++ ql/src/test/results/clientpositive/avro_historical_timestamp.q.out @@ -0,0 +1,59 @@ +PREHOOK: query: create table legacy_table (ts timestamp) stored as avro +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@legacy_table +POSTHOOK: query: create table legacy_table (ts timestamp) stored as avro +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@legacy_table +PREHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@legacy_table +POSTHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@legacy_table +PREHOOK: query: select * from legacy_table +PREHOOK: type: QUERY +PREHOOK: Input: default@legacy_table +#### A masked pattern was here #### +POSTHOOK: query: select * from legacy_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@legacy_table +#### A masked pattern was here #### +2018-12-31 15:30:30.111 +PREHOOK: query: select * from legacy_table +PREHOOK: type: QUERY +PREHOOK: Input: default@legacy_table +#### A masked pattern was here #### +POSTHOOK: query: select * from legacy_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@legacy_table +#### A masked pattern was here #### +2018-12-31 23:30:30.111 +PREHOOK: query: create table new_table (ts timestamp) stored as avro +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@new_table +POSTHOOK: query: create table new_table (ts timestamp) stored as avro +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@new_table +PREHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@new_table +POSTHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@new_table +PREHOOK: query: select * from new_table +PREHOOK: type: QUERY +PREHOOK: Input: default@new_table +#### A masked pattern was here #### +POSTHOOK: query: select * from new_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@new_table +#### A masked pattern was here #### +2019-01-01 00:30:30.111 diff --git ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out index 69192122ce..3ae8155b9e 100644 --- ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out +++ ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out @@ -107,7 +107,7 @@ Table Parameters: numPartitions 7 numRows 8 rawDataSize 0 - totalSize 3098 + totalSize 3294 #### A masked pattern was here #### # Storage Information @@ -219,7 +219,7 @@ Table Parameters: numPartitions 7 numRows 8 rawDataSize 0 - totalSize 3098 + totalSize 3294 #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out index 828dfb0b3d..bd75d7b116 100644 --- ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out +++ ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out @@ -121,7 +121,7 @@ STAGE PLANS: TableScan alias: testa filterExpr: ((part1 = 'CA') and (part2 = 'ABC')) (type: boolean) - Statistics: Num rows: 2 Data size: 4596 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 4876 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: rand() (type: double) outputColumnNames: _col0 @@ -177,7 +177,7 @@ STAGE PLANS: TableScan alias: testa filterExpr: ((part1 = 'CA') and (part2 = 'ABC')) (type: boolean) - Statistics: Num rows: 2 Data size: 4596 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 4876 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: rand() (type: double) outputColumnNames: _col0 diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 8cdc567dee..db8db1c922 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -22,11 +22,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.rmi.server.UID; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; @@ -40,8 +43,11 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.UnresolvedUnionException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.common.type.TimestampTZUtil; +import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveChar; @@ -76,6 +82,20 @@ * record encoding. */ private boolean warnedOnce = false; + + /** + * Time zone in which file was written, which may be stored in metadata. + */ + private ZoneId writerTimezone = null; + + private Configuration configuration = null; + + AvroDeserializer() {} + + AvroDeserializer(Configuration configuration) { + this.configuration = configuration; + } + /** * When encountering a record with an older schema than the one we're trying * to read, it is necessary to re-encode with a reader against the newer schema. @@ -148,6 +168,7 @@ public Object deserialize(List columnNames, List columnTypes, AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; GenericRecord r = recordWritable.getRecord(); Schema fileSchema = recordWritable.getFileSchema(); + writerTimezone = recordWritable.getWriterTimezone(); UID recordReaderId = recordWritable.getRecordReaderID(); //If the record reader (from which the record is originated) is already seen and valid, @@ -301,7 +322,29 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco throw new AvroSerdeException( "Unexpected Avro schema for Date TypeInfo: " + recordSchema.getType()); } - return Timestamp.ofEpochMilli((Long)datum); + // If a time zone is found in file metadata (property name: writer.time.zone), convert the + // timestamp to that (writer) time zone in order to emulate time zone agnostic behavior. + // If not, then the file was written by an older version of hive, so we convert the timestamp + // to the server's (reader) time zone for backwards compatibility reasons - unless the + // session level configuration hive.avro.timestamp.skip.conversion is set to true, in which + // case we assume it was written by a time zone agnostic writer, so we don't convert it. + boolean skipConversion; + if (configuration != null) { + skipConversion = HiveConf.getBoolVar( + configuration, HiveConf.ConfVars.HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION); + } else { + skipConversion = HiveConf.ConfVars.HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION.defaultBoolVal; + } + ZoneId convertToTimeZone; + if (writerTimezone != null) { + convertToTimeZone = writerTimezone; + } else if (skipConversion) { + convertToTimeZone = ZoneOffset.UTC; + } else { + convertToTimeZone = TimeZone.getDefault().toZoneId(); + } + Timestamp timestamp = Timestamp.ofEpochMilli((Long)datum); + return TimestampTZUtil.convertTimestampToZone(timestamp, ZoneOffset.UTC, convertToTimeZone); default: return datum; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java index 2f0ba10669..095197c2ed 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.rmi.server.UID; +import java.time.ZoneId; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; @@ -48,6 +49,9 @@ // Schema that exists in the Avro data file. private Schema fileSchema; + // Time zone file was written in, from metadata + private ZoneId writerTimezone = null; + /** * Unique Id determine which record reader created this record */ @@ -74,6 +78,10 @@ public AvroGenericRecordWritable(GenericRecord record) { this.record = record; } + public AvroGenericRecordWritable(ZoneId writerTimezone) { + this.writerTimezone = writerTimezone; + } + @Override public void write(DataOutput out) throws IOException { // Write schema since we need it to pull the data out. (see point #1 above) @@ -141,4 +149,8 @@ public Schema getFileSchema() { public void setFileSchema(Schema originalSchema) { this.fileSchema = originalSchema; } + + public ZoneId getWriterTimezone() { + return writerTimezone; + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 3955611733..653f5912fe 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -59,6 +59,7 @@ public static final String VARCHAR_TYPE_NAME = "varchar"; public static final String DATE_TYPE_NAME = "date"; public static final String TIMESTAMP_TYPE_NAME = "timestamp-millis"; + public static final String WRITER_TIME_ZONE = "writer.time.zone"; public static final String AVRO_PROP_LOGICAL_TYPE = "logicalType"; public static final String AVRO_PROP_PRECISION = "precision"; public static final String AVRO_PROP_SCALE = "scale"; @@ -139,7 +140,7 @@ public void initialize(Configuration configuration, Properties properties) throw if(!badSchema) { this.avroSerializer = new AvroSerializer(); - this.avroDeserializer = new AvroDeserializer(); + this.avroDeserializer = new AvroDeserializer(configuration); } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java index 99a0b9a487..4331c11398 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -34,6 +36,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; @@ -211,6 +214,8 @@ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fi case TIMESTAMP: Timestamp timestamp = ((TimestampObjectInspector) fieldOI).getPrimitiveJavaObject(structFieldData); + timestamp = TimestampTZUtil.convertTimestampToZone( + timestamp, TimeZone.getDefault().toZoneId(), ZoneOffset.UTC); return timestamp.toEpochMilli(); case UNKNOWN: throw new AvroSerdeException("Received UNKNOWN primitive category."); diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java index ef97d2dd83..1cd03f7368 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.rmi.server.UID; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; @@ -32,6 +33,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; @@ -41,6 +43,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.junit.Assert; import org.junit.Test; @@ -270,6 +274,54 @@ public void canDeserializeSingleItemUnions() throws SerDeException, IOException assertEquals(0, uoi.getTag(result.unionObject)); } + /** + * Test whether Avro timestamps can be deserialized according to new behavior (storage in UTC but + * LocalDateTime semantics as timestamps are converted back to the writer time zone) as well as + * old behavior (Instant semantics). + */ + @Test + public void canDeserializeTimestamps() throws SerDeException, IOException { + List columnNames = new ArrayList<>(); + columnNames.add("timestampField"); + List columnTypes = new ArrayList<>(); + columnTypes.add(TypeInfoFactory.getPrimitiveTypeInfo("timestamp")); + Schema readerSchema = + AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.TIMESTAMP_SCHEMA); + + // 2019-01-02 00:00:00 GMT is 1546387200000 milliseconds after epoch + GenericData.Record record = new GenericData.Record(readerSchema); + record.put("timestampField", 1546387200999L); + assertTrue(GENERIC_DATA.validate(readerSchema, record)); + + AvroGenericRecordWritable agrw = new AvroGenericRecordWritable(ZoneId.of("America/New_York")); + agrw.setRecord(record); + agrw.setFileSchema(readerSchema); + agrw.setRecordReaderID(new UID()); + + AvroDeserializer deserializer = new AvroDeserializer(); + ArrayList row = + (ArrayList) deserializer.deserialize(columnNames, columnTypes, agrw, readerSchema); + Timestamp resultTimestamp = (Timestamp) row.get(0); + + // 2019-01-02 00:00:00 GMT is 2019-01-01 19:00:00 GMT-0500 (America/New_York / EST) + assertEquals(Timestamp.valueOf("2019-01-01 19:00:00.999"), resultTimestamp); + + // Do the same without specifying writer time zone. This tests deserialization of older records + // which should be interpreted in Instant semantics + AvroGenericRecordWritable agrw2 = new AvroGenericRecordWritable(); + agrw2.setRecord(record); + agrw2.setFileSchema(readerSchema); + agrw2.setRecordReaderID(new UID()); + + row = + (ArrayList) deserializer.deserialize(columnNames, columnTypes, agrw2, readerSchema); + resultTimestamp = (Timestamp) row.get(0); + + // 2019-01-02 00:00:00 GMT is 2019-01-01 16:00:00 in zone GMT-0800 (PST) + // This is the time zone for VM in test. + assertEquals(Timestamp.valueOf("2019-01-01 16:00:00.999"), resultTimestamp); + } + @Test public void canDeserializeUnions() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.UNION_SCHEMA); diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java index ee83ba360b..b500c6eaaa 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java @@ -222,6 +222,16 @@ " ]\n" + "}"; + public static final String TIMESTAMP_SCHEMA = "{\n" + + " \"type\": \"record\", \n" + + " \"name\": \"timestampTest\",\n" + + " \"fields\" : [\n" + + " {\"name\":\"timestampField\", " + + " \"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\", " + + " \"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}" + + " ]\n" + + "}"; + public static final String KITCHEN_SINK_SCHEMA = "{\n" + " \"namespace\": \"org.apache.hadoop.hive\",\n" + " \"name\": \"kitchsink\",\n" + diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java index 93eafc11fc..bcd0fd1acf 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java @@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericEnumSymbol; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveDecimalV1; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -120,6 +121,13 @@ public void canSerializeDoubles() throws SerDeException, IOException { singleFieldTest("double1", 24.00000001, "\"double\""); } + @Test + public void canSerializeTimestamps() throws SerDeException, IOException { + singleFieldTest("timestamp1", Timestamp.valueOf("2011-01-01 00:00:00").toEpochMilli(), + "\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," + + "\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\""); + } + @Test public void canSerializeDecimals() throws SerDeException, IOException { ByteBuffer bb = ByteBuffer.wrap(HiveDecimal.create("3.1416").bigIntegerBytes());