diff --git a/ql/src/test/queries/clientpositive/avro_create_as_select.q b/ql/src/test/queries/clientpositive/avro_create_as_select.q new file mode 100644 index 0000000..73f17a8 --- /dev/null +++ b/ql/src/test/queries/clientpositive/avro_create_as_select.q @@ -0,0 +1,13 @@ +-- verify 1) can create table using create table as select from non-avro table 2) LOAD avro data into new table and read data from the new table +CREATE TABLE doctors(number int, first_name STRING, last_name STRING) STORED AS TEXTFILE; +DESCRIBE doctors; + +CREATE TABLE copy_doctors ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' as SELECT * FROM doctors; +DESCRIBE copy_doctors; + +LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE copy_doctors; + +SELECT * FROM copy_doctors ORDER BY number; + + + diff --git a/ql/src/test/queries/clientpositive/avro_create_as_select2.q b/ql/src/test/queries/clientpositive/avro_create_as_select2.q new file mode 100644 index 0000000..0930c50 --- /dev/null +++ b/ql/src/test/queries/clientpositive/avro_create_as_select2.q @@ -0,0 +1,41 @@ +-- verify 1) can create table using create table as select from another Avro table 2) read data from the newly created table +CREATE TABLE doctors +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +TBLPROPERTIES ('avro.schema.literal'='{ + "namespace": "testing.hive.avro.serde", + "name": "doctors", + "type": "record", + "fields": [ + { + "name":"number", + "type":"int", + "doc":"Order of playing the role" + }, + { + "name":"first_name", + "type":"string", + "doc":"first name of actor playing role" + }, + { + "name":"last_name", + "type":"string", + "doc":"last name of actor playing role" + } + ] +}'); + +DESCRIBE doctors; + +LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE doctors; + +SELECT * FROM doctors ORDER BY number; + +CREATE TABLE copy_doctors as SELECT * FROM doctors; +DESCRIBE copy_doctors; + +SELECT * FROM copy_doctors ORDER BY number; + diff --git a/ql/src/test/queries/clientpositive/avro_no_schema_test.q b/ql/src/test/queries/clientpositive/avro_no_schema_test.q new file mode 100644 index 0000000..3bfcaeb --- /dev/null +++ b/ql/src/test/queries/clientpositive/avro_no_schema_test.q @@ -0,0 +1,19 @@ +-- verify 1) we can create Avro table w/o providing Avro schema (only using Hive schema) 2) read data from the table +CREATE TABLE doctors +( +number int, +first_name string, +last_name string +) +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'; + +DESCRIBE doctors; + +LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE doctors; + +SELECT * FROM doctors ORDER BY number; + diff --git a/ql/src/test/queries/clientpositive/avro_without_schema.q b/ql/src/test/queries/clientpositive/avro_without_schema.q new file mode 100644 index 0000000..b09efae --- /dev/null +++ b/ql/src/test/queries/clientpositive/avro_without_schema.q @@ -0,0 +1,26 @@ +-- verify we can create Avro table w/o providing Avro schema (only using Hive schema) +CREATE TABLE avro1 +( +string1 string, +int1 int, +tinyint1 int, +smallint1 int, +bigint1 int, +boolean1 boolean, +float1 float, +double1 double, +list2 array, +map1 map, +struct1 struct, +union1 uniontype, +enum1 string, +nullableint int, +bytes1 array +) +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'; + +DESCRIBE avro1; diff --git a/ql/src/test/results/clientpositive/avro_create_as_select.q.out b/ql/src/test/results/clientpositive/avro_create_as_select.q.out new file mode 100644 index 0000000..38f69fb --- /dev/null +++ b/ql/src/test/results/clientpositive/avro_create_as_select.q.out @@ -0,0 +1,53 @@ +PREHOOK: query: -- verify 1) can create table using create table as select from non-avro table 2) LOAD avro data into new table and read data from the new table +CREATE TABLE doctors(number int, first_name STRING, last_name STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- verify 1) can create table using create table as select from non-avro table 2) LOAD avro data into new table and read data from the new table +CREATE TABLE doctors(number int, first_name STRING, last_name STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@doctors +PREHOOK: query: DESCRIBE doctors +PREHOOK: type: DESCTABLE +POSTHOOK: query: DESCRIBE doctors +POSTHOOK: type: DESCTABLE +number int None +first_name string None +last_name string None +PREHOOK: query: CREATE TABLE copy_doctors ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' as SELECT * FROM doctors +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@doctors +POSTHOOK: query: CREATE TABLE copy_doctors ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' as SELECT * FROM doctors +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@doctors +POSTHOOK: Output: default@copy_doctors +PREHOOK: query: DESCRIBE copy_doctors +PREHOOK: type: DESCTABLE +POSTHOOK: query: DESCRIBE copy_doctors +POSTHOOK: type: DESCTABLE +number int from deserializer +first_name string from deserializer +last_name string from deserializer +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE copy_doctors +PREHOOK: type: LOAD +PREHOOK: Output: default@copy_doctors +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE copy_doctors +POSTHOOK: type: LOAD +POSTHOOK: Output: default@copy_doctors +PREHOOK: query: SELECT * FROM copy_doctors ORDER BY number +PREHOOK: type: QUERY +PREHOOK: Input: default@copy_doctors +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM copy_doctors ORDER BY number +POSTHOOK: type: QUERY +POSTHOOK: Input: default@copy_doctors +#### A masked pattern was here #### +1 William Hartnell +2 Patrick Troughton +3 Jon Pertwee +4 Tom Baker +5 Peter Davison +6 Colin Baker +7 Sylvester McCoy +8 Paul McGann +9 Christopher Eccleston +10 David Tennant +11 Matt Smith diff --git a/ql/src/test/results/clientpositive/avro_create_as_select2.q.out b/ql/src/test/results/clientpositive/avro_create_as_select2.q.out new file mode 100644 index 0000000..b2a57c5 --- /dev/null +++ b/ql/src/test/results/clientpositive/avro_create_as_select2.q.out @@ -0,0 +1,126 @@ +PREHOOK: query: -- verify 1) can create table using create table as select from another Avro table 2) read data from the newly created table +CREATE TABLE doctors +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +TBLPROPERTIES ('avro.schema.literal'='{ + "namespace": "testing.hive.avro.serde", + "name": "doctors", + "type": "record", + "fields": [ + { + "name":"number", + "type":"int", + "doc":"Order of playing the role" + }, + { + "name":"first_name", + "type":"string", + "doc":"first name of actor playing role" + }, + { + "name":"last_name", + "type":"string", + "doc":"last name of actor playing role" + } + ] +}') +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- verify 1) can create table using create table as select from another Avro table 2) read data from the newly created table +CREATE TABLE doctors +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +TBLPROPERTIES ('avro.schema.literal'='{ + "namespace": "testing.hive.avro.serde", + "name": "doctors", + "type": "record", + "fields": [ + { + "name":"number", + "type":"int", + "doc":"Order of playing the role" + }, + { + "name":"first_name", + "type":"string", + "doc":"first name of actor playing role" + }, + { + "name":"last_name", + "type":"string", + "doc":"last name of actor playing role" + } + ] +}') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@doctors +PREHOOK: query: DESCRIBE doctors +PREHOOK: type: DESCTABLE +POSTHOOK: query: DESCRIBE doctors +POSTHOOK: type: DESCTABLE +number int from deserializer +first_name string from deserializer +last_name string from deserializer +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE doctors +PREHOOK: type: LOAD +PREHOOK: Output: default@doctors +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE doctors +POSTHOOK: type: LOAD +POSTHOOK: Output: default@doctors +PREHOOK: query: SELECT * FROM doctors ORDER BY number +PREHOOK: type: QUERY +PREHOOK: Input: default@doctors +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM doctors ORDER BY number +POSTHOOK: type: QUERY +POSTHOOK: Input: default@doctors +#### A masked pattern was here #### +1 William Hartnell +2 Patrick Troughton +3 Jon Pertwee +4 Tom Baker +5 Peter Davison +6 Colin Baker +7 Sylvester McCoy +8 Paul McGann +9 Christopher Eccleston +10 David Tennant +11 Matt Smith +PREHOOK: query: CREATE TABLE copy_doctors as SELECT * FROM doctors +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@doctors +POSTHOOK: query: CREATE TABLE copy_doctors as SELECT * FROM doctors +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@doctors +POSTHOOK: Output: default@copy_doctors +PREHOOK: query: DESCRIBE copy_doctors +PREHOOK: type: DESCTABLE +POSTHOOK: query: DESCRIBE copy_doctors +POSTHOOK: type: DESCTABLE +number int None +first_name string None +last_name string None +PREHOOK: query: SELECT * FROM copy_doctors ORDER BY number +PREHOOK: type: QUERY +PREHOOK: Input: default@copy_doctors +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM copy_doctors ORDER BY number +POSTHOOK: type: QUERY +POSTHOOK: Input: default@copy_doctors +#### A masked pattern was here #### +1 William Hartnell +2 Patrick Troughton +3 Jon Pertwee +4 Tom Baker +5 Peter Davison +6 Colin Baker +7 Sylvester McCoy +8 Paul McGann +9 Christopher Eccleston +10 David Tennant +11 Matt Smith diff --git a/ql/src/test/results/clientpositive/avro_no_schema_test.q.out b/ql/src/test/results/clientpositive/avro_no_schema_test.q.out new file mode 100644 index 0000000..a5a3462 --- /dev/null +++ b/ql/src/test/results/clientpositive/avro_no_schema_test.q.out @@ -0,0 +1,59 @@ +PREHOOK: query: -- verify 1) we can create Avro table w/o providing Avro schema (only using Hive schema) 2) read data from the table +CREATE TABLE doctors +( +number int, +first_name string, +last_name string +) +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- verify 1) we can create Avro table w/o providing Avro schema (only using Hive schema) 2) read data from the table +CREATE TABLE doctors +( +number int, +first_name string, +last_name string +) +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@doctors +PREHOOK: query: DESCRIBE doctors +PREHOOK: type: DESCTABLE +POSTHOOK: query: DESCRIBE doctors +POSTHOOK: type: DESCTABLE +number int from deserializer +first_name string from deserializer +last_name string from deserializer +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE doctors +PREHOOK: type: LOAD +PREHOOK: Output: default@doctors +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/doctors.avro' INTO TABLE doctors +POSTHOOK: type: LOAD +POSTHOOK: Output: default@doctors +PREHOOK: query: SELECT * FROM doctors ORDER BY number +PREHOOK: type: QUERY +PREHOOK: Input: default@doctors +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM doctors ORDER BY number +POSTHOOK: type: QUERY +POSTHOOK: Input: default@doctors +#### A masked pattern was here #### +1 William Hartnell +2 Patrick Troughton +3 Jon Pertwee +4 Tom Baker +5 Peter Davison +6 Colin Baker +7 Sylvester McCoy +8 Paul McGann +9 Christopher Eccleston +10 David Tennant +11 Matt Smith diff --git a/ql/src/test/results/clientpositive/avro_without_schema.q.out b/ql/src/test/results/clientpositive/avro_without_schema.q.out new file mode 100644 index 0000000..b867686 --- /dev/null +++ b/ql/src/test/results/clientpositive/avro_without_schema.q.out @@ -0,0 +1,70 @@ +PREHOOK: query: -- verify we can create Avro table w/o providing Avro schema (only using Hive schema) +CREATE TABLE avro1 +( +string1 string, +int1 int, +tinyint1 int, +smallint1 int, +bigint1 int, +boolean1 boolean, +float1 float, +double1 double, +list2 array, +map1 map, +struct1 struct, +union1 uniontype, +enum1 string, +nullableint int, +bytes1 array +) +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- verify we can create Avro table w/o providing Avro schema (only using Hive schema) +CREATE TABLE avro1 +( +string1 string, +int1 int, +tinyint1 int, +smallint1 int, +bigint1 int, +boolean1 boolean, +float1 float, +double1 double, +list2 array, +map1 map, +struct1 struct, +union1 uniontype, +enum1 string, +nullableint int, +bytes1 array +) +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@avro1 +PREHOOK: query: DESCRIBE avro1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: DESCRIBE avro1 +POSTHOOK: type: DESCTABLE +string1 string from deserializer +int1 int from deserializer +tinyint1 int from deserializer +smallint1 int from deserializer +bigint1 int from deserializer +boolean1 boolean from deserializer +float1 float from deserializer +double1 double from deserializer +list2 array from deserializer +map1 map from deserializer +struct1 struct from deserializer +union1 uniontype from deserializer +enum1 string from deserializer +nullableint int from deserializer +bytes1 array from deserializer diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 13848b6..d3e22ed 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -18,6 +18,12 @@ package org.apache.hadoop.hive.serde2.avro; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + import org.apache.avro.Schema; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,13 +32,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.JobConf; -import java.io.IOException; -import java.net.URL; -import java.util.List; -import java.util.Properties; - /** * Utilities useful only to the AvroSerde itself. Not mean to be used by * end-users but public for interop to the ql package. @@ -56,18 +59,26 @@ */ public static Schema determineSchemaOrThrowException(Properties properties) throws IOException, AvroSerdeException { + LOG.info("determineSchemaOrThrowException props " + properties); String schemaString = properties.getProperty(SCHEMA_LITERAL); - if(schemaString != null && !schemaString.equals(SCHEMA_NONE)) + if (schemaString != null && !schemaString.equals(SCHEMA_NONE)) { return Schema.parse(schemaString); + } // Try pulling directly from URL schemaString = properties.getProperty(SCHEMA_URL); - if(schemaString == null || schemaString.equals(SCHEMA_NONE)) + if(schemaString == null || schemaString.equals(SCHEMA_NONE)) { + // neither url nor literal defined. Take schema from cols + Schema schema= avroSchemaFromTableDefintion(properties); + if(schema != null) { + return schema; + } throw new AvroSerdeException(EXCEPTION_MESSAGE); - + } try { - if(schemaString.toLowerCase().startsWith("hdfs://")) + if(schemaString.toLowerCase().startsWith("hdfs://")) { return getSchemaFromHDFS(schemaString, new Configuration()); + } } catch(IOException ioe) { throw new AvroSerdeException("Unable to read schema from HDFS: " + schemaString, ioe); } @@ -76,6 +87,59 @@ public static Schema determineSchemaOrThrowException(Properties properties) } /** + * @param properties + * : Table properties including the table name, columns name and column types. + * + * @return Equivalent schema + * @throws AvroSerdeException + */ + private synchronized static Schema avroSchemaFromTableDefintion(Properties properties) + throws AvroSerdeException { + String tableName = properties.getProperty("name", ""); + String columnsName = properties.getProperty("columns", ""); + String columnsType = properties.getProperty("columns.types", ""); + // Sanity checks + if (columnsName.length() == 0 || columnsType.length() == 0 || tableName.length() == 0) { + return null; + } + // Generate schema for column names and types + StringBuilder schemaString = new StringBuilder(); + List colNames = Arrays.asList(columnsName.split(",")); + List colTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsType); + assert (colNames.size() == colTypes.size()); // Sanity check + + schemaString.append("{\n"); + schemaString + .append( + getSchemaItem("name", + "org.apache.hive.auto_gen_schema_" + tableName.replaceAll("\\.", "_"))).append( + ",\n"); + schemaString.append(getSchemaItem("type", "record")).append(",\n"); + schemaString.append("\"fields\": [").append("\n"); + int i = 0; + for (i = 0; i < colNames.size() - 1; i++) { + schemaString.append(getSchemafield(colNames.get(i), colTypes.get(i))).append(",\n"); + } + schemaString.append(getSchemafield(colNames.get(i), colTypes.get(i))).append("\n]"); + schemaString.append("}\n"); + Schema schema = Schema.parse(schemaString.toString()); + LOG.info("Newly Inferred or Generated schema from properties :\n" + properties + + "\n to eqivalent schema :\n" + schema.toString(true)); + return schema; + } + + private static String getSchemafield(String fieldName, TypeInfo fieldType) + throws AvroSerdeException { + String ret = "{ \"name\" : \"" + fieldName + "\" , \"type\" :" + + TypeInfoToSchema.generateSchema(fieldType, fieldName, true).toString(true) + " }"; + return ret; + } + + private static String getSchemaItem(String key, String value) { + return "\"" + key + "\" : \"" + value + "\""; + } + + /** * Attempt to determine the schema via the usual means, but do not throw * an exception if we fail. Instead, signal failure via a special * schema. This is used because Hive calls init on the serde during @@ -106,7 +170,9 @@ protected static Schema getSchemaFromHDFS(String schemaHDFSUrl, Schema s = Schema.parse(in); return s; } finally { - if(in != null) in.close(); + if(in != null) { + in.close(); + } } } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java new file mode 100644 index 0000000..7995bda --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java @@ -0,0 +1,216 @@ +package org.apache.hadoop.hive.serde2.avro; + +import static org.apache.avro.Schema.Type.BOOLEAN; +import static org.apache.avro.Schema.Type.DOUBLE; +import static org.apache.avro.Schema.Type.FLOAT; +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.avro.Schema.Type.NULL; +import static org.apache.avro.Schema.Type.STRING; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; + +/** + * TypeInfoToSchema class is a utility class to convert Hive type to equivalent + * Avro schema. + * + * The following conversion is supported: + * Hive Avro + * ==================== + * boolean boolean + * int int + * bigint long + * smallint int + * tinyint int + * float float + * double double + * Array[smallint] bytes + * string string + * struct record + * map map + * array list + * union union + * + * Moreover, since any type could potentially hold a NULL value, all corresponding + * Avro schema should be a union of null and the type (UNION). + * + */ + +class TypeInfoToSchema { + + /** + * Convert a TypeInfo object (related to Hive) into Avro schema. + * + * @param tInfo + * : Type infor to be converted + * @param tag + * : This required to mainly create struct/record type name. + * @param wrapWithUnion + * : determine if the wrapping with Union of is required. + * @return Avro converted schema + * @throws AvroSerdeException + */ + public static Schema generateSchema(TypeInfo tInfo, String tag, boolean wrapWithUnion) + throws AvroSerdeException { + Category cat = tInfo.getCategory(); + Schema schema = null; + + if (primitiveTypeInfoToSchema.containsKey(tInfo)) { + schema = primitiveTypeInfoToSchema.get(tInfo); + } + else { + switch (cat) { + case STRUCT: + schema = generateRecordSchema(tInfo, tag, wrapWithUnion); + break; + case LIST: + schema = generateArraySchema(tInfo, tag, wrapWithUnion); + break; + case MAP: + schema = generateMapSchema(tInfo, tag, wrapWithUnion); + break; + case UNION: + schema = generateUnionSchema(tInfo, tag); + break; + default: + throw new AvroSerdeException("Unsupported category :" + cat + " fot type info :" + tInfo); + } + } + + if (cat != org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.UNION + && wrapWithUnion) { + return wrapWithUnion(schema); + } + else { + return schema; + } + } + + // Map of Hive's supported primitive types to Avro schema + private static final Map primitiveTypeInfoToSchema = initTypeMap(); + + private static Map initTypeMap() { + Map theMap = new Hashtable(); + // TODO: convert to serdeConstants + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("string"), Schema.create(STRING)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("int"), Schema.create(INT)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("smallint"), Schema.create(INT)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("tinyint"), Schema.create(INT)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), Schema.create(BOOLEAN)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), Schema.create(LONG)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("float"), Schema.create(FLOAT)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("double"), Schema.create(DOUBLE)); + theMap.put(TypeInfoFactory.getPrimitiveTypeInfo("void"), Schema.create(NULL)); + + return Collections.unmodifiableMap(theMap); + } + private static Schema generateSchemaWorker(TypeInfo tInfo, String tag, boolean wrapWithUnion) + throws AvroSerdeException { + Category cat = tInfo.getCategory(); + Schema schema = null; + + if (primitiveTypeInfoToSchema.containsKey(tInfo)) { + schema = primitiveTypeInfoToSchema.get(tInfo); + } + else { + switch (cat) { + case STRUCT: + schema = generateRecordSchema(tInfo, tag, wrapWithUnion); + break; + case LIST: + schema = generateArraySchema(tInfo, tag, wrapWithUnion); + break; + case MAP: + schema = generateMapSchema(tInfo, tag, wrapWithUnion); + break; + case UNION: + schema = generateUnionSchema(tInfo, tag); + break; + default: + throw new AvroSerdeException("Unsupported category :" + cat + " fot type info :" + tInfo); + } + } + + if (cat != org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.UNION + && wrapWithUnion) { + return wrapWithUnion(schema); + } + else { + return schema; + } + + } + + private static Schema generateUnionSchema(TypeInfo tInfo, String tag) throws AvroSerdeException { + assert (tInfo.getCategory().equals(Category.UNION)); + List types = new ArrayList(); + for (TypeInfo ti : ((UnionTypeInfo) tInfo).getAllUnionObjectTypeInfos()) { + types.add(generateSchema(ti, tag + "_" + tInfo.getCategory().name(), false)); + } + Schema schema = Schema.createUnion(types); + return schema; + } + + private static Schema generateRecordSchema(TypeInfo tInfo, String tag, boolean wrapWithUnion) + throws AvroSerdeException { + assert (tInfo.getCategory().equals(Category.STRUCT)); + + List fields = new ArrayList(); + ArrayList fieldNames = ((StructTypeInfo) tInfo).getAllStructFieldNames(); + ArrayList fieldTypes = ((StructTypeInfo) tInfo).getAllStructFieldTypeInfos(); + String newTag = tag + "_" + tInfo.getCategory().name(); + for (int i = 0; i < fieldNames.size(); i++) { + fields.add(new Field(fieldNames.get(i), generateSchema(fieldTypes.get(i), newTag, false), + null, null)); + } + Schema schema = Schema.createRecord(newTag, null, null, false); + schema.setFields(fields); + return schema; + } + + private static Schema generateMapSchema(TypeInfo tInfo, String tag, boolean wrapWithUnion) + throws AvroSerdeException { + assert (tInfo.getCategory().equals(Category.MAP)); + Schema retSchema = Schema.createMap(generateSchema(((MapTypeInfo) tInfo).getMapValueTypeInfo(), + tag + "_" + tInfo.getCategory().name(), true)); + return retSchema; + } + + private static Schema generateArraySchema(TypeInfo tInfo, String tag, boolean wrapWithUnion) + throws AvroSerdeException { + assert (tInfo.getCategory().equals(Category.LIST)); + TypeInfo ltInfo = ((ListTypeInfo) tInfo).getListElementTypeInfo(); + // TODO: ARRAY[SMALLINT orTINYINT] + if (ltInfo.getCategory().equals(Category.PRIMITIVE) + && ((PrimitiveTypeInfo) ltInfo).getPrimitiveCategory() == PrimitiveCategory.SHORT) { + return Schema.create(Schema.Type.BYTES); + } else { + Schema retSchema = Schema.createArray(generateSchema(ltInfo, tag + "_" + + tInfo.getCategory().name(), false)); + return retSchema; + } + } + + public static Schema wrapWithUnion(Schema orgSchema) { + List types = new ArrayList(); + types.add(orgSchema); + types.add(Schema.create(NULL)); + return Schema.createUnion(types); + } +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java index 010f614..c89b3a8 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java @@ -17,17 +17,6 @@ */ package org.apache.hadoop.hive.serde2.avro; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.Test; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.util.Properties; - import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.EXCEPTION_MESSAGE; import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.SCHEMA_LITERAL; import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.SCHEMA_NONE; @@ -39,6 +28,17 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Test; + public class TestAvroSerdeUtils { private final String NULLABLE_UNION = "{\n" + " \"type\": \"record\", \n" + @@ -198,7 +198,29 @@ public void determineSchemaCanReadSchemaFromHDFS() throws IOException, AvroSerde Schema expectedSchema = Schema.parse(schemaString); assertEquals(expectedSchema, schemaFromHDFS); } finally { - if(miniDfs != null) miniDfs.shutdown(); + if(miniDfs != null) { + miniDfs.shutdown(); + } } } + + @Test + public void autoSchemaGenFromHiveTypes() throws IOException, AvroSerdeException { + String expSchemaString = "{\"type\":\"record\",\"name\":\"auto_gen_schema_default_auto_schema_test\"," + + + "\"namespace\":\"org.apache.hive\",\"fields\":[{\"name\":\"id\",\"type\":[\"long\",\"null\"]}," + + + "{\"name\":\"address\",\"type\":[{\"type\":\"map\",\"values\":[\"int\",\"null\"]},\"null\"]}," + + + "{\"name\":\"amounts\",\"type\":[{\"type\":\"array\",\"items\":\"float\"},\"null\"]}]}"; + + Properties props = new Properties(); + + props.put("name", "default.auto_schema_test"); + props.put("columns", "id,address,amounts"); + props.put("columns.types", "bigint:map:array"); + + Schema schema = AvroSerdeUtils.determineSchemaOrThrowException(props); + assertEquals(Schema.parse(expSchemaString), schema); + } } diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java new file mode 100644 index 0000000..f7b5d5b --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java @@ -0,0 +1,189 @@ +package org.apache.hadoop.hive.serde2.avro; + +import static org.apache.avro.Schema.Type.BOOLEAN; +import static org.apache.avro.Schema.Type.DOUBLE; +import static org.apache.avro.Schema.Type.FLOAT; +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.avro.Schema.Type.NULL; +import static org.apache.avro.Schema.Type.STRING; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Test; + +public class TestTypeInfoToSchema { + + @Test + public void basicSchemaConversion() throws Exception + { + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.stringTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(STRING))); + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.intTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(INT))); + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.shortTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(INT))); + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.byteTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(INT))); + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.booleanTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(BOOLEAN))); + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.doubleTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(DOUBLE))); + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.floatTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(FLOAT))); + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.longTypeInfo, "", true), + TypeInfoToSchema.wrapWithUnion(Schema.create(LONG))); + } + + @Test + public void arraySchemaConversion() throws Exception { + List schemas = new ArrayList(); + List typeInfos = new ArrayList(); + schemas.add(Schema.createArray(TypeInfoToSchema.wrapWithUnion(Schema.create(STRING)))); + typeInfos.add(TypeInfoFactory.getListTypeInfo(wrapTypeInfoWithUnion(TypeInfoFactory.stringTypeInfo))); + + schemas.add(Schema.create(NULL)); + typeInfos.add(TypeInfoFactory.voidTypeInfo); + + Schema fromHiveSchema = TypeInfoToSchema.generateSchema( + TypeInfoFactory.getUnionTypeInfo(typeInfos), "", true); + //System.out.println("AAAAAA :" + fromHiveSchema + " \nBBBBBB" + Schema.createUnion(schemas)); + assertEquals(fromHiveSchema, Schema.createUnion(schemas)); + } + + @Test + public void structSchemaConversion() throws Exception { + List names = new ArrayList(); + List typeInfos = new ArrayList(); + names.add("f1"); + typeInfos.add(wrapTypeInfoWithUnion(TypeInfoFactory.doubleTypeInfo)); + names.add("f2"); + typeInfos.add(wrapTypeInfoWithUnion(TypeInfoFactory.stringTypeInfo)); + names.add("f3"); + typeInfos.add(wrapTypeInfoWithUnion(TypeInfoFactory.booleanTypeInfo)); + names.add("f4"); + typeInfos.add(wrapTypeInfoWithUnion(TypeInfoFactory.doubleTypeInfo)); + List unionTypes = new ArrayList(); + unionTypes.add(TypeInfoFactory.getStructTypeInfo(names, typeInfos)); + unionTypes.add(TypeInfoFactory.voidTypeInfo); + + List fields = new ArrayList(); + for (int i = 0; i < names.size(); i++) { + fields.add(new Field(names.get(i), TypeInfoToSchema.generateSchema(typeInfos.get(i), + names.get(i), false), null, null)); + } + + List schemas = new ArrayList(); + Schema expSchema = Schema.createRecord("TestName_UNION_STRUCT", null, null, false); + expSchema.setFields(fields); + schemas.add(expSchema); + schemas.add(Schema.create(NULL)); + + assertEquals(TypeInfoToSchema.generateSchema( + TypeInfoFactory.getUnionTypeInfo(unionTypes), "TestName", true), + Schema.createUnion(schemas)); + } + + @Test + public void unionSchemaConversion() throws Exception { + List typeInfos = new ArrayList(); + List schemas = new ArrayList(); + typeInfos.add(TypeInfoFactory.doubleTypeInfo); + schemas.add(TypeInfoToSchema.generateSchema(TypeInfoFactory.doubleTypeInfo, "", false)); + typeInfos.add(TypeInfoFactory.stringTypeInfo); + schemas.add(TypeInfoToSchema.generateSchema(TypeInfoFactory.stringTypeInfo, "", false)); + typeInfos.add(TypeInfoFactory.booleanTypeInfo); + schemas.add(TypeInfoToSchema.generateSchema(TypeInfoFactory.booleanTypeInfo, "", false)); + typeInfos.add(TypeInfoFactory.intTypeInfo); + schemas.add(TypeInfoToSchema.generateSchema(TypeInfoFactory.intTypeInfo, "", false)); + + assertEquals(TypeInfoToSchema.generateSchema(TypeInfoFactory.getUnionTypeInfo(typeInfos), "", false), + Schema.createUnion(schemas)); + + } + + @Test + public void mapSchemaConversion() throws Exception { + TypeInfo mapTI = TypeInfoFactory.getMapTypeInfo(wrapTypeInfoWithUnion(TypeInfoFactory.stringTypeInfo), + wrapTypeInfoWithUnion(TypeInfoFactory.booleanTypeInfo)); + Schema sch = Schema.createMap(TypeInfoToSchema.generateSchema(TypeInfoFactory.booleanTypeInfo, "", true)); + + List schemas = new ArrayList(); + schemas.add(sch); + schemas.add(Schema.create(NULL)); + Schema genSch = TypeInfoToSchema.generateSchema(mapTI, "", true); + System.out.println("LEFT " + genSch + "\nRIGHT :" + Schema.createUnion(schemas)); + assertEquals(genSch, Schema.createUnion(schemas)); + + } + + @Test + public void combineSchemaConversion1() throws Exception { + // Create a List of double + List listSch = new ArrayList(); + TypeInfo listTI = getListInfo(wrapTypeInfoWithUnion(TypeInfoFactory.doubleTypeInfo), listSch); + + // Create a struct of list of double, STRING + List structSch = new ArrayList(); + TypeInfo structTI = getStruct(listTI, wrapTypeInfoWithUnion(TypeInfoFactory.stringTypeInfo), structSch, + "TestName_UNION_STRUCT"); + + assertEquals(TypeInfoToSchema.generateSchema(structTI, "TestName", true), structSch.get(0)); + } + + private TypeInfo getStruct(TypeInfo t1, TypeInfo t2, List sch, String tag) + throws Exception { + List names = new ArrayList(); + List typeInfos = new ArrayList(); + names.add("f1"); + typeInfos.add(t1); + names.add("f2"); + typeInfos.add(t2); + List unionTypes = new ArrayList(); + unionTypes.add(TypeInfoFactory.getStructTypeInfo(names, typeInfos)); + unionTypes.add(TypeInfoFactory.voidTypeInfo); + + List fields = new ArrayList(); + for (int i = 0; i < names.size(); i++) { + fields.add(new Field(names.get(i), TypeInfoToSchema.generateSchema(typeInfos.get(i), + names.get(i), false), null, null)); + } + + List schemas = new ArrayList(); + Schema expSchema = Schema.createRecord(tag, null, null, false); + expSchema.setFields(fields); + schemas.add(expSchema); + schemas.add(Schema.create(NULL)); + + sch.add(0, Schema.createUnion(schemas)); + return TypeInfoFactory.getUnionTypeInfo(unionTypes); + } + + private TypeInfo getListInfo(TypeInfo t1, List sch) throws Exception { + List schemas = new ArrayList(); + List typeInfos = new ArrayList(); + + schemas.add(Schema.createArray(TypeInfoToSchema.generateSchema(t1, "", true))); + typeInfos.add(TypeInfoFactory.getListTypeInfo(t1)); + + schemas.add(Schema.create(NULL)); + typeInfos.add(TypeInfoFactory.voidTypeInfo); + + sch.add(0, Schema.createUnion(schemas)); + return TypeInfoFactory.getUnionTypeInfo(typeInfos); + } + + private TypeInfo wrapTypeInfoWithUnion(TypeInfo tInfo) { + List typeInfos = new ArrayList(); + typeInfos.add(tInfo); + typeInfos.add(TypeInfoFactory.voidTypeInfo); + + return TypeInfoFactory.getUnionTypeInfo(typeInfos); + } +}