diff --git data/files/avro_timestamp.txt data/files/avro_timestamp.txt new file mode 100644 index 0000000..a989f0e --- /dev/null +++ data/files/avro_timestamp.txt @@ -0,0 +1,4 @@ +2012-02-21 07:08:09.123|foo:1980-12-16 07:08:09.123,bar:1998-05-07 07:08:09.123|2011-09-04 07:08:09.123,2011-09-05 07:08:09.123 +2014-02-11 07:08:09.123|baz:1981-12-16 07:08:09.123|2011-09-05 07:08:09.123 +1947-02-11 07:08:09.123|baz:1921-12-16 07:08:09.123|2011-09-05 07:08:09.123 +8200-02-11 07:08:09.123|baz:6981-12-16 07:08:09.123|1039-09-05 07:08:09.123 \ No newline at end of file diff --git ql/src/test/queries/clientpositive/avro_timestamp.q ql/src/test/queries/clientpositive/avro_timestamp.q new file mode 100644 index 0000000..7c04ede --- /dev/null +++ ql/src/test/queries/clientpositive/avro_timestamp.q @@ -0,0 +1,24 @@ +DROP TABLE avro_timestamp_staging; +DROP TABLE avro_timestamp; +DROP TABLE avro_timestamp_casts; + +CREATE TABLE avro_timestamp_staging (d timestamp, m1 map, l1 array) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' + COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' + STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../../data/files/avro_timestamp.txt' OVERWRITE INTO TABLE avro_timestamp_staging; + +CREATE TABLE avro_timestamp (d timestamp, m1 map, l1 array) + PARTITIONED BY (p1 int, p2 timestamp) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' + COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' + STORED AS AVRO; + +INSERT OVERWRITE TABLE avro_timestamp PARTITION(p1=2, p2='2014-09-26 07:08:09.123') SELECT * FROM avro_timestamp_staging; + +SELECT * FROM avro_timestamp; +SELECT d, COUNT(d) FROM avro_timestamp GROUP BY d; +SELECT * FROM avro_timestamp WHERE d!='1947-02-11 07:08:09.123'; +SELECT * FROM avro_timestamp WHERE d<'2014-12-21 07:08:09.123'; +SELECT * FROM avro_timestamp WHERE d>'8000-12-01 07:08:09.123'; diff --git ql/src/test/results/clientpositive/avro_timestamp.q.out ql/src/test/results/clientpositive/avro_timestamp.q.out new file mode 100644 index 0000000..3faa9d4 --- /dev/null +++ ql/src/test/results/clientpositive/avro_timestamp.q.out @@ -0,0 +1,126 @@ +PREHOOK: query: DROP TABLE avro_timestamp_staging +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE avro_timestamp_staging +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE avro_timestamp +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE avro_timestamp +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE avro_timestamp_casts +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE avro_timestamp_casts +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE avro_timestamp_staging (d timestamp, m1 map, l1 array) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' + COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' + STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@avro_timestamp_staging +POSTHOOK: query: CREATE TABLE avro_timestamp_staging (d timestamp, m1 map, l1 array) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' + COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' + STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@avro_timestamp_staging +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/avro_timestamp.txt' OVERWRITE INTO TABLE avro_timestamp_staging +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@avro_timestamp_staging +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/avro_timestamp.txt' OVERWRITE INTO TABLE avro_timestamp_staging +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@avro_timestamp_staging +PREHOOK: query: CREATE TABLE avro_timestamp (d timestamp, m1 map, l1 array) + PARTITIONED BY (p1 int, p2 timestamp) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' + COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' + STORED AS AVRO +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@avro_timestamp +POSTHOOK: query: CREATE TABLE avro_timestamp (d timestamp, m1 map, l1 array) + PARTITIONED BY (p1 int, p2 timestamp) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' + COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' + STORED AS AVRO +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@avro_timestamp +PREHOOK: query: INSERT OVERWRITE TABLE avro_timestamp PARTITION(p1=2, p2='2014-09-26 07:08:09.123') SELECT * FROM avro_timestamp_staging +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_timestamp_staging +PREHOOK: Output: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +POSTHOOK: query: INSERT OVERWRITE TABLE avro_timestamp PARTITION(p1=2, p2='2014-09-26 07:08:09.123') SELECT * FROM avro_timestamp_staging +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_timestamp_staging +POSTHOOK: Output: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +POSTHOOK: Lineage: avro_timestamp PARTITION(p1=2,p2=2014-09-26 07:08:09.123).d SIMPLE [(avro_timestamp_staging)avro_timestamp_staging.FieldSchema(name:d, type:timestamp, comment:null), ] +POSTHOOK: Lineage: avro_timestamp PARTITION(p1=2,p2=2014-09-26 07:08:09.123).l1 SIMPLE [(avro_timestamp_staging)avro_timestamp_staging.FieldSchema(name:l1, type:array, comment:null), ] +POSTHOOK: Lineage: avro_timestamp PARTITION(p1=2,p2=2014-09-26 07:08:09.123).m1 SIMPLE [(avro_timestamp_staging)avro_timestamp_staging.FieldSchema(name:m1, type:map, comment:null), ] +PREHOOK: query: SELECT * FROM avro_timestamp +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_timestamp +PREHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM avro_timestamp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_timestamp +POSTHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +2012-02-21 07:08:09.123 {"foo":"1980-12-16 07:08:09.123","bar":"1998-05-07 07:08:09.123"} ["2011-09-04 07:08:09.123","2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +2014-02-11 07:08:09.123 {"baz":"1981-12-16 07:08:09.123"} ["2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +1947-02-11 07:08:09.123 {"baz":"1921-12-16 07:08:09.123"} ["2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +8200-02-11 07:08:09.123 {"baz":"6981-12-16 07:08:09.123"} ["1039-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +PREHOOK: query: SELECT d, COUNT(d) FROM avro_timestamp GROUP BY d +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_timestamp +PREHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +POSTHOOK: query: SELECT d, COUNT(d) FROM avro_timestamp GROUP BY d +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_timestamp +POSTHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +1947-02-11 07:08:09.123 1 +2012-02-21 07:08:09.123 1 +2014-02-11 07:08:09.123 1 +8200-02-11 07:08:09.123 1 +PREHOOK: query: SELECT * FROM avro_timestamp WHERE d!='1947-02-11 07:08:09.123' +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_timestamp +PREHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM avro_timestamp WHERE d!='1947-02-11 07:08:09.123' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_timestamp +POSTHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +2012-02-21 07:08:09.123 {"foo":"1980-12-16 07:08:09.123","bar":"1998-05-07 07:08:09.123"} ["2011-09-04 07:08:09.123","2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +2014-02-11 07:08:09.123 {"baz":"1981-12-16 07:08:09.123"} ["2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +8200-02-11 07:08:09.123 {"baz":"6981-12-16 07:08:09.123"} ["1039-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +PREHOOK: query: SELECT * FROM avro_timestamp WHERE d<'2014-12-21 07:08:09.123' +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_timestamp +PREHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM avro_timestamp WHERE d<'2014-12-21 07:08:09.123' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_timestamp +POSTHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +2012-02-21 07:08:09.123 {"foo":"1980-12-16 07:08:09.123","bar":"1998-05-07 07:08:09.123"} ["2011-09-04 07:08:09.123","2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +2014-02-11 07:08:09.123 {"baz":"1981-12-16 07:08:09.123"} ["2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +1947-02-11 07:08:09.123 {"baz":"1921-12-16 07:08:09.123"} ["2011-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 +PREHOOK: query: SELECT * FROM avro_timestamp WHERE d>'8000-12-01 07:08:09.123' +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_timestamp +PREHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM avro_timestamp WHERE d>'8000-12-01 07:08:09.123' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_timestamp +POSTHOOK: Input: default@avro_timestamp@p1=2/p2=2014-09-26 07%3A08%3A09.123 +#### A masked pattern was here #### +8200-02-11 07:08:09.123 {"baz":"6981-12-16 07:08:09.123"} ["1039-09-05 07:08:09.123"] 2 2014-09-26 07:08:09.123 diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 07c5ecf..d107291 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.rmi.server.UID; import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -287,6 +288,12 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco } return new Date(DateWritable.daysToMillis((Integer)datum)); + case TIMESTAMP: + if (recordSchema.getType() != Type.LONG) { + throw new AvroSerdeException( + "Unexpected Avro schema for Date TypeInfo: " + recordSchema.getType()); + } + return new Timestamp((Long)datum); default: return datum; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 7639a2b..0280b0e 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -55,12 +55,14 @@ public static final String CHAR_TYPE_NAME = "char"; public static final String VARCHAR_TYPE_NAME = "varchar"; public static final String DATE_TYPE_NAME = "date"; + public static final String TIMESTAMP_TYPE_NAME = "timestamp-millis"; public static final String AVRO_PROP_LOGICAL_TYPE = "logicalType"; public static final String AVRO_PROP_PRECISION = "precision"; public static final String AVRO_PROP_SCALE = "scale"; public static final String AVRO_PROP_MAX_LENGTH = "maxLength"; public static final String AVRO_STRING_TYPE_NAME = "string"; public static final String AVRO_INT_TYPE_NAME = "int"; + public static final String AVRO_LONG_TYPE_NAME = "long"; private ObjectInspector oi; private List columnNames; diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java index c8eac89..809c2f2 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.serde2.avro; import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; @@ -207,6 +209,10 @@ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fi case DATE: Date date = ((DateObjectInspector)fieldOI).getPrimitiveJavaObject(structFieldData); return DateWritable.dateToDays(date); + case TIMESTAMP: + Timestamp timestamp = + ((TimestampObjectInspector) fieldOI).getPrimitiveJavaObject(structFieldData); + return timestamp.getTime(); case UNKNOWN: throw new AvroSerdeException("Received UNKNOWN primitive category."); case VOID: diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java index c84b1a0..3998737 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java @@ -108,8 +108,8 @@ protected TypeInfo makeInstance(Schema s) throws AvroSerdeException { public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException { // For bytes type, it can be mapped to decimal. Schema.Type type = schema.getType(); - if (type == Schema.Type.BYTES && - AvroSerDe.DECIMAL_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + if (type == BYTES && AvroSerDe.DECIMAL_TYPE_NAME + .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { int precision = 0; int scale = 0; try { @@ -128,8 +128,8 @@ public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException return TypeInfoFactory.getDecimalTypeInfo(precision, scale); } - if (type == Schema.Type.STRING && - AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + if (type == STRING && + AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { int maxLength = 0; try { maxLength = schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt(); @@ -139,8 +139,8 @@ public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException return TypeInfoFactory.getCharTypeInfo(maxLength); } - if (type == Schema.Type.STRING && - AvroSerDe.VARCHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME + .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { int maxLength = 0; try { maxLength = schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt(); @@ -150,11 +150,16 @@ public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException return TypeInfoFactory.getVarcharTypeInfo(maxLength); } - if (type == Schema.Type.INT && - AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + if (type == INT && + AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { return TypeInfoFactory.dateTypeInfo; } + if (type == LONG && + AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + return TypeInfoFactory.timestampTypeInfo; + } + return typeInfoCache.retrieve(schema); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java index 8cb2dc3..4f8b05f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java @@ -157,6 +157,11 @@ private Schema createAvroPrimitive(TypeInfo typeInfo) { "\"type\":\"" + AvroSerDe.AVRO_INT_TYPE_NAME + "\"," + "\"logicalType\":\"" + AvroSerDe.DATE_TYPE_NAME + "\"}"); break; + case TIMESTAMP: + schema = AvroSerdeUtils.getSchemaFor("{" + + "\"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," + + "\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}"); + break; case VOID: schema = Schema.create(Schema.Type.NULL); break; diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java index cd5a0fa..abbf038 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java @@ -68,6 +68,8 @@ serdeConstants.VOID_TYPE_NAME); private static final TypeInfo DATE = TypeInfoFactory.getPrimitiveTypeInfo( serdeConstants.DATE_TYPE_NAME); + private static final TypeInfo TIMESTAMP = + TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.TIMESTAMP_TYPE_NAME); private static final int PRECISION = 4; private static final int SCALE = 2; private static final TypeInfo DECIMAL = TypeInfoFactory.getPrimitiveTypeInfo( @@ -255,6 +257,17 @@ public void createAvroDateSchema() { } @Test + public void createAvroTimestampSchema() { + final String specificSchema = "{" + + "\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}"; + String expectedSchema = genSchema(specificSchema); + + Assert.assertEquals("Test for timestamp in avro schema failed", + expectedSchema, getAvroSchemaString(TIMESTAMP)); + } + + @Test public void createAvroListSchema() { ListTypeInfo listTypeInfo = new ListTypeInfo(); listTypeInfo.setListElementTypeInfo(STRING);