diff --git a/data/files/avro_historical_timestamp_legacy.avro b/data/files/avro_historical_timestamp_legacy.avro new file mode 100644 index 0000000000000000000000000000000000000000..97accd43aabc550f857628d3dfebdf52a8cc0fc8 GIT binary patch literal 216 zcmeZI%3@>^ODrqO*DFrWNX<>$##F6TQdy9yWTjM;nw(#hqNJmgmzWFUCqkLU1&PTZ zeoAUuVrfnZP&6$wH7BK5$tt=UrnaOQC=4+tS}CtICr3#KZf;I~UOG@VCqF$iIWZ>$ zVp&OMZfbE!Vs3$MZe~tSX0cLjtPb1;E077bv9%0-yU!em-7EbhvS#kX;vd(WnfRFY My}bIeAqrgr0Q06&VgLXD literal 0 HcmV?d00001 diff --git a/data/files/avro_historical_timestamp_new.avro b/data/files/avro_historical_timestamp_new.avro new file mode 100644 index 0000000000000000000000000000000000000000..fe01e5419dc969d4b6f12784455d6c965f3dc7a6 GIT binary patch literal 245 zcmeZI%3@>@ODrqO*DFrWNX<>$##F6TQdy9yWTjM;nw(#hqNJmgmzWFUCqkLU1&PTZ zeoAUuVrfnZP&6$wH7BK5$tt=UrnaOQC=4+tS}CtICr3#KZf;I~UOG@VCqF$iIWZ>$ zVp&OMZfbE!Vs3$MZe~tSX0cLjtPb1;E077bv9${2MVTe3MS38`dR6&(sba3BMfnA( m`a$`*sSLS-*A>}q(;@|uW;M-oH(bcX$F%R|)t3!X=n?=_pH(0L literal 0 HcmV?d00001 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java index 59d3bba534..be7d8b7ca1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java @@ -24,12 +24,14 @@ import java.io.IOException; import java.util.Properties; +import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -76,6 +78,8 @@ dfw.setCodec(factory); } + // add writer.time.zone property to file metadata + dfw.setMeta(AvroSerDe.WRITER_TIME_ZONE, TimeZone.getDefault().toZoneId().toString()); dfw.create(schema, path.getFileSystem(jobConf).create(path)); return new AvroGenericRecordWriter(dfw); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java index b2bbab9876..6fdb690288 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java @@ -19,7 +19,10 @@ import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.rmi.server.UID; +import java.time.DateTimeException; +import java.time.ZoneId; import java.util.Map; import java.util.Properties; @@ -30,6 +33,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.FsInput; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -59,6 +63,7 @@ final private org.apache.avro.file.FileReader reader; final private long start; final private long stop; + private ZoneId writerTimezone; protected JobConf jobConf; final private boolean isEmptyInput; /** @@ -95,6 +100,8 @@ public AvroGenericRecordReader(JobConf job, FileSplit split, Reporter reporter) } this.stop = split.getStart() + split.getLength(); this.recordReaderID = new UID(); + + this.writerTimezone = extractWriterTimezoneFromMetadata(job, split, gdr); } /** @@ -142,6 +149,21 @@ private Schema getSchema(JobConf job, FileSplit split) throws AvroSerdeException return null; } + private ZoneId extractWriterTimezoneFromMetadata(JobConf job, FileSplit split, + GenericDatumReader gdr) throws IOException { + DataFileReader dataFileReader = + new DataFileReader(new FsInput(split.getPath(), job), gdr); + if (dataFileReader.getMeta(AvroSerDe.WRITER_TIME_ZONE) != null) { + try { + return ZoneId.of(new String(dataFileReader.getMeta(AvroSerDe.WRITER_TIME_ZONE), + StandardCharsets.UTF_8)); + } catch (DateTimeException e) { + throw new RuntimeException("Can't parse writer time zone stored in file metadata", e); + } + } + return null; + } + private boolean pathIsInPartition(Path split, Path partitionPath) { boolean schemeless = split.toUri().getScheme() == null; if (schemeless) { @@ -174,7 +196,7 @@ public NullWritable createKey() { @Override public AvroGenericRecordWritable createValue() { - return new AvroGenericRecordWritable(); + return new AvroGenericRecordWritable(writerTimezone); } @Override diff --git a/ql/src/test/queries/clientpositive/avro_historical_timestamp.q b/ql/src/test/queries/clientpositive/avro_historical_timestamp.q new file mode 100644 index 0000000000..c3af54a4ca --- /dev/null +++ b/ql/src/test/queries/clientpositive/avro_historical_timestamp.q @@ -0,0 +1,17 @@ +--These files were created by inserting timestamp '2019-01-01 00:30:30.111111111' into column (ts timestamp) where +--writer time zone is Europe/Rome. + +--older writer: time zone dependent behavior. convert to reader time zone +create table legacy_table (ts timestamp) stored as avro; + +load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table; + +select * from legacy_table; + + +--newer writer: time zone agnostic behavior. convert to writer time zone (US/Pacific) +create table new_table (ts timestamp) stored as avro; + +load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table; + +select * from new_table; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/avro_historical_timestamp.q.out b/ql/src/test/results/clientpositive/avro_historical_timestamp.q.out new file mode 100644 index 0000000000..ffdf47dd8f --- /dev/null +++ b/ql/src/test/results/clientpositive/avro_historical_timestamp.q.out @@ -0,0 +1,50 @@ +PREHOOK: query: create table legacy_table (ts timestamp) stored as avro +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@legacy_table +POSTHOOK: query: create table legacy_table (ts timestamp) stored as avro +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@legacy_table +PREHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@legacy_table +POSTHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@legacy_table +PREHOOK: query: select * from legacy_table +PREHOOK: type: QUERY +PREHOOK: Input: default@legacy_table +#### A masked pattern was here #### +POSTHOOK: query: select * from legacy_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@legacy_table +#### A masked pattern was here #### +2018-12-31 15:30:30.111 +PREHOOK: query: create table new_table (ts timestamp) stored as avro +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@new_table +POSTHOOK: query: create table new_table (ts timestamp) stored as avro +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@new_table +PREHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@new_table +POSTHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@new_table +PREHOOK: query: select * from new_table +PREHOOK: type: QUERY +PREHOOK: Input: default@new_table +#### A masked pattern was here #### +POSTHOOK: query: select * from new_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@new_table +#### A masked pattern was here #### +2019-01-01 00:30:30.111 diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 8cdc567dee..e62c5ade0d 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -22,11 +22,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.rmi.server.UID; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; @@ -42,6 +45,7 @@ import org.apache.avro.UnresolvedUnionException; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveChar; @@ -76,6 +80,12 @@ * record encoding. */ private boolean warnedOnce = false; + + /** + * Time zone in which file was written, which may be stored in metadata. + */ + private ZoneId writerTimezone = null; + /** * When encountering a record with an older schema than the one we're trying * to read, it is necessary to re-encode with a reader against the newer schema. @@ -148,6 +158,7 @@ public Object deserialize(List columnNames, List columnTypes, AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; GenericRecord r = recordWritable.getRecord(); Schema fileSchema = recordWritable.getFileSchema(); + writerTimezone = recordWritable.getWriterTimezone(); UID recordReaderId = recordWritable.getRecordReaderID(); //If the record reader (from which the record is originated) is already seen and valid, @@ -301,7 +312,14 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco throw new AvroSerdeException( "Unexpected Avro schema for Date TypeInfo: " + recordSchema.getType()); } - return Timestamp.ofEpochMilli((Long)datum); + // If a time zone is found in file metadata (property name: writer.time.zone), convert the + // timestamp to that (writer) time zone in order to emulate time zone agnostic behavior. + // If not, then the file was written by an older version of hive, so we convert the timestamp + // to the server's (reader) time zone for backwards compatibility reasons. + ZoneId convertToTimeZone = + writerTimezone != null ? writerTimezone : TimeZone.getDefault().toZoneId(); + Timestamp timestamp = Timestamp.ofEpochMilli((Long)datum); + return TimestampTZUtil.convertTimestampToZone(timestamp, ZoneOffset.UTC, convertToTimeZone); default: return datum; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java index 2f0ba10669..095197c2ed 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.rmi.server.UID; +import java.time.ZoneId; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; @@ -48,6 +49,9 @@ // Schema that exists in the Avro data file. private Schema fileSchema; + // Time zone file was written in, from metadata + private ZoneId writerTimezone = null; + /** * Unique Id determine which record reader created this record */ @@ -74,6 +78,10 @@ public AvroGenericRecordWritable(GenericRecord record) { this.record = record; } + public AvroGenericRecordWritable(ZoneId writerTimezone) { + this.writerTimezone = writerTimezone; + } + @Override public void write(DataOutput out) throws IOException { // Write schema since we need it to pull the data out. (see point #1 above) @@ -141,4 +149,8 @@ public Schema getFileSchema() { public void setFileSchema(Schema originalSchema) { this.fileSchema = originalSchema; } + + public ZoneId getWriterTimezone() { + return writerTimezone; + } } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 3955611733..2e12d6c1e1 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -59,6 +59,7 @@ public static final String VARCHAR_TYPE_NAME = "varchar"; public static final String DATE_TYPE_NAME = "date"; public static final String TIMESTAMP_TYPE_NAME = "timestamp-millis"; + public static final String WRITER_TIME_ZONE = "writer.time.zone"; public static final String AVRO_PROP_LOGICAL_TYPE = "logicalType"; public static final String AVRO_PROP_PRECISION = "precision"; public static final String AVRO_PROP_SCALE = "scale"; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java index 99a0b9a487..5a62e111b0 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -34,6 +36,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; @@ -211,6 +214,8 @@ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fi case TIMESTAMP: Timestamp timestamp = ((TimestampObjectInspector) fieldOI).getPrimitiveJavaObject(structFieldData); + timestamp = TimestampTZUtil.convertTimestampToZone( + timestamp,TimeZone.getDefault().toZoneId(), ZoneOffset.UTC); return timestamp.toEpochMilli(); case UNKNOWN: throw new AvroSerdeException("Received UNKNOWN primitive category."); diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java index ef97d2dd83..40e08917f4 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.rmi.server.UID; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; @@ -32,6 +33,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; @@ -41,6 +43,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.junit.Assert; import org.junit.Test; @@ -270,6 +274,54 @@ public void canDeserializeSingleItemUnions() throws SerDeException, IOException assertEquals(0, uoi.getTag(result.unionObject)); } + /** + * Test whether Avro timestamps can be deserialized according to new behavior (storage in UTC but + * LocalDateTime semantics as timestamps are converted back to the writer time zone) as well as + * old behavior (Instant semantics) + */ + @Test + public void canDeserializeTimestamps() throws SerDeException, IOException { + List columnNames = new ArrayList<>(); + columnNames.add("timestampField"); + List columnTypes = new ArrayList<>(); + columnTypes.add(TypeInfoFactory.getPrimitiveTypeInfo("timestamp")); + Schema readerSchema = + AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.TIMESTAMP_SCHEMA); + + // 2019-01-02 00:00:00 GMT is 1546387200000 milliseconds after epoch + GenericData.Record record = new GenericData.Record(readerSchema); + record.put("timestampField", 1546387200999L); + assertTrue(GENERIC_DATA.validate(readerSchema, record)); + + AvroGenericRecordWritable agrw = new AvroGenericRecordWritable(ZoneId.of("America/New_York")); + agrw.setRecord(record); + agrw.setFileSchema(readerSchema); + agrw.setRecordReaderID(new UID()); + + AvroDeserializer deserializer = new AvroDeserializer(); + ArrayList row = + (ArrayList) deserializer.deserialize(columnNames, columnTypes, agrw, readerSchema); + Timestamp resultTimestamp = (Timestamp) row.get(0); + + // 2019-01-02 00:00:00 GMT is 2019-01-01 19:00:00 GMT-0500 (America/New_York / EST) + assertEquals(Timestamp.valueOf("2019-01-01 19:00:00.999"), resultTimestamp); + + // Do the same without specifying writer time zone. This tests deserialization of older records + // which should be interpreted in Instant semantics + AvroGenericRecordWritable agrw2 = new AvroGenericRecordWritable(); + agrw2.setRecord(record); + agrw2.setFileSchema(readerSchema); + agrw2.setRecordReaderID(new UID()); + + row = + (ArrayList) deserializer.deserialize(columnNames, columnTypes, agrw2, readerSchema); + resultTimestamp = (Timestamp) row.get(0); + + // 2019-01-02 00:00:00 GMT is 2019-01-01 16:00:00 in zone GMT-0800 (PST) + // This is the time zone for VM in test. + assertEquals(Timestamp.valueOf("2019-01-01 16:00:00.999"), resultTimestamp); + } + @Test public void canDeserializeUnions() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.UNION_SCHEMA); diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java index ee83ba360b..b500c6eaaa 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java @@ -222,6 +222,16 @@ " ]\n" + "}"; + public static final String TIMESTAMP_SCHEMA = "{\n" + + " \"type\": \"record\", \n" + + " \"name\": \"timestampTest\",\n" + + " \"fields\" : [\n" + + " {\"name\":\"timestampField\", " + + " \"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\", " + + " \"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}" + + " ]\n" + + "}"; + public static final String KITCHEN_SINK_SCHEMA = "{\n" + " \"namespace\": \"org.apache.hadoop.hive\",\n" + " \"name\": \"kitchsink\",\n" + diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java index 93eafc11fc..bcd0fd1acf 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java @@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericEnumSymbol; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveDecimalV1; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -120,6 +121,13 @@ public void canSerializeDoubles() throws SerDeException, IOException { singleFieldTest("double1", 24.00000001, "\"double\""); } + @Test + public void canSerializeTimestamps() throws SerDeException, IOException { + singleFieldTest("timestamp1", Timestamp.valueOf("2011-01-01 00:00:00").toEpochMilli(), + "\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," + + "\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\""); + } + @Test public void canSerializeDecimals() throws SerDeException, IOException { ByteBuffer bb = ByteBuffer.wrap(HiveDecimal.create("3.1416").bigIntegerBytes());