diff --git data/files/bintest.avro data/files/bintest.avro new file mode 100644 index 0000000..79864bf Binary files /dev/null and data/files/bintest.avro differ diff --git ql/src/test/queries/clientpositive/avro_binary_test.q ql/src/test/queries/clientpositive/avro_binary_test.q new file mode 100644 index 0000000..5f66282 --- /dev/null +++ ql/src/test/queries/clientpositive/avro_binary_test.q @@ -0,0 +1,26 @@ +-- verify that we can read avro BYTES type into Hive's BINARY format +CREATE TABLE bintest +ROW FORMAT +SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS +INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +TBLPROPERTIES ('avro.schema.literal'='{ + "namespace": "testing.hive.avro.serde", + "name": "bintest", + "type": "record", + "fields": [ + { + "name":"binitem", + "type":"bytes", + "doc":"some Avro BYTES info we are testing on" + } + ] +}'); + +DESCRIBE bintest; + +LOAD DATA LOCAL INPATH '../data/files/bintest.avro' INTO TABLE bintest; + +SELECT * FROM bintest; + diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 041e659..ac9f05d 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -159,11 +159,15 @@ class AvroDeserializer { if(AvroSerdeUtils.isNullableType(recordSchema)) return deserializeNullableUnion(datum, recordSchema, columnType); - if(columnType == TypeInfoFactory.stringTypeInfo) + if (columnType == TypeInfoFactory.stringTypeInfo) return datum.toString(); // To workaround AvroUTF8 // This also gets us around the Enum issue since we just take the value // and convert it to a string. Yay! + if (columnType == TypeInfoFactory.binaryTypeInfo) { + return ((ByteBuffer)datum).array(); // return as raw byte[] + } + switch(columnType.getCategory()) { case STRUCT: return deserializeStruct((GenericData.Record) datum, (StructTypeInfo) columnType); diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java index 42f38e7..96be145 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java @@ -137,6 +137,8 @@ class AvroSerializer { private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException { switch(fieldOI.getPrimitiveCategory()) { + case BINARY: // convert to Avro BYTES which is byte[] inside ByteBuffer + return ByteBuffer.wrap((byte[])fieldOI.getPrimitiveJavaObject(structFieldData)); case UNKNOWN: throw new AvroSerdeException("Received UNKNOWN primitive category."); case VOID: @@ -156,32 +158,13 @@ class AvroSerializer { schema.getTypes().get(tag)); } - // We treat FIXED and BYTES as arrays of tinyints within Hive. Check - // if we're dealing with either of these types and thus need to serialize - // them as their Avro types. - private boolean isTransformedType(Schema schema) { - return schema.getType().equals(FIXED) || schema.getType().equals(BYTES); - } - - private Object serializeTransformedType(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException { - if(LOG.isDebugEnabled()) { - LOG.debug("Beginning to transform " + typeInfo + " with Avro schema " + schema.toString(false)); - } - if(schema.getType().equals(FIXED)) return serializedAvroFixed(typeInfo, fieldOI, structFieldData, schema); - else return serializeAvroBytes(typeInfo, fieldOI, structFieldData, schema); - - } - - private Object serializeAvroBytes(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException { - ByteBuffer bb = ByteBuffer.wrap(extraByteArray(fieldOI, structFieldData)); - return bb.rewind(); - } - + // We treat FIXED as an array of tinyints within Hive. Check + // if we're dealing with FIXED and need to serialize as Avro type. private Object serializedAvroFixed(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException { return new GenericData.Fixed(schema, extraByteArray(fieldOI, structFieldData)); } - // For transforming to BYTES and FIXED, pull out the byte array Avro will want + // For transforming FIXED, pull out the byte array Avro will want private byte[] extraByteArray(ListObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException { // Grab a book. This is going to be slow. int listLength = fieldOI.getListLength(structFieldData); @@ -200,9 +183,13 @@ class AvroSerializer { } private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException { - if(isTransformedType(schema)) - return serializeTransformedType(typeInfo, fieldOI, structFieldData, schema); - + if(schema.getType().equals(FIXED)) { + if(LOG.isDebugEnabled()) { + LOG.debug("Beginning to transform " + typeInfo + " with Avro schema " + schema.toString(false)); + } + return serializedAvroFixed(typeInfo, fieldOI, structFieldData, schema); + } + List list = fieldOI.getList(structFieldData); List deserialized = new ArrayList(list.size()); diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java index ff13d3f..c741d0c 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java @@ -27,6 +27,7 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; +import static org.apache.avro.Schema.Type.BYTES; import static org.apache.avro.Schema.Type.BOOLEAN; import static org.apache.avro.Schema.Type.DOUBLE; import static org.apache.avro.Schema.Type.FLOAT; @@ -47,7 +48,7 @@ class SchemaToTypeInfo { // long bigint check // float double check // double double check - // bytes + // bytes binary check // string string check // tinyint // smallint @@ -56,6 +57,7 @@ class SchemaToTypeInfo { private static final Map primitiveTypeToTypeInfo = initTypeMap(); private static Map initTypeMap() { Map theMap = new Hashtable(); + theMap.put(BYTES, TypeInfoFactory.getPrimitiveTypeInfo("binary")); theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string")); theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int")); theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean")); @@ -180,25 +182,21 @@ class SchemaToTypeInfo { // convert as such. private static TypeInfo generateEnumTypeInfo(Schema schema) { assert schema.getType().equals(Schema.Type.ENUM); - return TypeInfoFactory.getPrimitiveTypeInfo("string"); } - // Hive doesn't have a Fixed type, so we're going to treat them as arrays of - // bytes + // Hive doesn't have a Fixed type, so we're going to treat as array of bytes // TODO: Make note in documentation that Hive sends these out as signed bytes. - private static final TypeInfo FIXED_AND_BYTES_EQUIV = + private static final TypeInfo FIXED_EQUIV = TypeInfoFactory.getListTypeInfo(TypeInfoFactory.byteTypeInfo); private static TypeInfo generateFixedTypeInfo(Schema schema) { assert schema.getType().equals(Schema.Type.FIXED); - - return FIXED_AND_BYTES_EQUIV; + return FIXED_EQUIV; } - // Avro considers bytes to be a primitive type, but Hive doesn't. We'll - // convert them to a list of bytes, just like Fixed. Sigh. + // Avro considers bytes to be a primitive type, trans. to Hive "binary" type private static TypeInfo generateBytesTypeInfo(Schema schema) { assert schema.getType().equals(Schema.Type.BYTES); - return FIXED_AND_BYTES_EQUIV; + return TypeInfoFactory.getPrimitiveTypeInfo("binary"); } }