diff --git hbase-handler/pom.xml hbase-handler/pom.xml index 7c3524c..7120d64 100644 --- hbase-handler/pom.xml +++ hbase-handler/pom.xml @@ -82,6 +82,11 @@ ${junit.version} test + + org.apache.hadoop + hadoop-test + test + @@ -217,11 +222,55 @@ + + avroif + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + schemas + generate-test-sources + + protocol + + + ${basedir}/src/if/avro/ + ${basedir}/src/gen/avro/gen-java/ + + + + + + + ${basedir}/src/java ${basedir}/src/test + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-test-sources + generate-test-sources + + add-test-source + + + + src/gen/avro/gen-java + + + + + + - diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Address.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Address.java new file mode 100644 index 0000000..9cf1dcb --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Address.java @@ -0,0 +1,465 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"fields\":[{\"name\":\"address1\",\"type\":\"string\"},{\"name\":\"address2\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"zipcode\",\"type\":\"long\"},{\"name\":\"county\",\"type\":[{\"type\":\"record\",\"name\":\"HomePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"OfficePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},\"string\",\"null\"]},{\"name\":\"aliases\",\"type\":[{\"type\":\"array\",\"items\":\"string\"},\"null\"]},{\"name\":\"metadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public java.lang.CharSequence address1; + @Deprecated public java.lang.CharSequence address2; + @Deprecated public java.lang.CharSequence city; + @Deprecated public long zipcode; + @Deprecated public java.lang.Object county; + @Deprecated public java.util.List aliases; + @Deprecated public java.util.Map metadata; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use {@link \#newBuilder()}. + */ + public Address() {} + + /** + * All-args constructor. + */ + public Address(java.lang.CharSequence address1, java.lang.CharSequence address2, java.lang.CharSequence city, java.lang.Long zipcode, java.lang.Object county, java.util.List aliases, java.util.Map metadata) { + this.address1 = address1; + this.address2 = address2; + this.city = city; + this.zipcode = zipcode; + this.county = county; + this.aliases = aliases; + this.metadata = metadata; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return address1; + case 1: return address2; + case 2: return city; + case 3: return zipcode; + case 4: return county; + case 5: return aliases; + case 6: return metadata; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: address1 = (java.lang.CharSequence)value$; break; + case 1: address2 = (java.lang.CharSequence)value$; break; + case 2: city = (java.lang.CharSequence)value$; break; + case 3: zipcode = (java.lang.Long)value$; break; + case 4: county = (java.lang.Object)value$; break; + case 5: aliases = (java.util.List)value$; break; + case 6: metadata = (java.util.Map)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'address1' field. + */ + public java.lang.CharSequence getAddress1() { + return address1; + } + + /** + * Sets the value of the 'address1' field. + * @param value the value to set. + */ + public void setAddress1(java.lang.CharSequence value) { + this.address1 = value; + } + + /** + * Gets the value of the 'address2' field. + */ + public java.lang.CharSequence getAddress2() { + return address2; + } + + /** + * Sets the value of the 'address2' field. + * @param value the value to set. + */ + public void setAddress2(java.lang.CharSequence value) { + this.address2 = value; + } + + /** + * Gets the value of the 'city' field. + */ + public java.lang.CharSequence getCity() { + return city; + } + + /** + * Sets the value of the 'city' field. + * @param value the value to set. + */ + public void setCity(java.lang.CharSequence value) { + this.city = value; + } + + /** + * Gets the value of the 'zipcode' field. + */ + public java.lang.Long getZipcode() { + return zipcode; + } + + /** + * Sets the value of the 'zipcode' field. + * @param value the value to set. + */ + public void setZipcode(java.lang.Long value) { + this.zipcode = value; + } + + /** + * Gets the value of the 'county' field. + */ + public java.lang.Object getCounty() { + return county; + } + + /** + * Sets the value of the 'county' field. + * @param value the value to set. + */ + public void setCounty(java.lang.Object value) { + this.county = value; + } + + /** + * Gets the value of the 'aliases' field. + */ + public java.util.List getAliases() { + return aliases; + } + + /** + * Sets the value of the 'aliases' field. + * @param value the value to set. + */ + public void setAliases(java.util.List value) { + this.aliases = value; + } + + /** + * Gets the value of the 'metadata' field. + */ + public java.util.Map getMetadata() { + return metadata; + } + + /** + * Sets the value of the 'metadata' field. + * @param value the value to set. + */ + public void setMetadata(java.util.Map value) { + this.metadata = value; + } + + /** Creates a new Address RecordBuilder */ + public static org.apache.hadoop.hive.hbase.avro.Address.Builder newBuilder() { + return new org.apache.hadoop.hive.hbase.avro.Address.Builder(); + } + + /** Creates a new Address RecordBuilder by copying an existing Builder */ + public static org.apache.hadoop.hive.hbase.avro.Address.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.Address.Builder other) { + return new org.apache.hadoop.hive.hbase.avro.Address.Builder(other); + } + + /** Creates a new Address RecordBuilder by copying an existing Address instance */ + public static org.apache.hadoop.hive.hbase.avro.Address.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.Address other) { + return new org.apache.hadoop.hive.hbase.avro.Address.Builder(other); + } + + /** + * RecordBuilder for Address instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase
+ implements org.apache.avro.data.RecordBuilder
{ + + private java.lang.CharSequence address1; + private java.lang.CharSequence address2; + private java.lang.CharSequence city; + private long zipcode; + private java.lang.Object county; + private java.util.List aliases; + private java.util.Map metadata; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.hadoop.hive.hbase.avro.Address.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.hadoop.hive.hbase.avro.Address.Builder other) { + super(other); + if (isValidValue(fields()[0], other.address1)) { + this.address1 = data().deepCopy(fields()[0].schema(), other.address1); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.address2)) { + this.address2 = data().deepCopy(fields()[1].schema(), other.address2); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.city)) { + this.city = data().deepCopy(fields()[2].schema(), other.city); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.zipcode)) { + this.zipcode = data().deepCopy(fields()[3].schema(), other.zipcode); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.county)) { + this.county = data().deepCopy(fields()[4].schema(), other.county); + fieldSetFlags()[4] = true; + } + if (isValidValue(fields()[5], other.aliases)) { + this.aliases = data().deepCopy(fields()[5].schema(), other.aliases); + fieldSetFlags()[5] = true; + } + if (isValidValue(fields()[6], other.metadata)) { + this.metadata = data().deepCopy(fields()[6].schema(), other.metadata); + fieldSetFlags()[6] = true; + } + } + + /** Creates a Builder by copying an existing Address instance */ + private Builder(org.apache.hadoop.hive.hbase.avro.Address other) { + super(org.apache.hadoop.hive.hbase.avro.Address.SCHEMA$); + if (isValidValue(fields()[0], other.address1)) { + this.address1 = data().deepCopy(fields()[0].schema(), other.address1); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.address2)) { + this.address2 = data().deepCopy(fields()[1].schema(), other.address2); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.city)) { + this.city = data().deepCopy(fields()[2].schema(), other.city); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.zipcode)) { + this.zipcode = data().deepCopy(fields()[3].schema(), other.zipcode); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.county)) { + this.county = data().deepCopy(fields()[4].schema(), other.county); + fieldSetFlags()[4] = true; + } + if (isValidValue(fields()[5], other.aliases)) { + this.aliases = data().deepCopy(fields()[5].schema(), other.aliases); + fieldSetFlags()[5] = true; + } + if (isValidValue(fields()[6], other.metadata)) { + this.metadata = data().deepCopy(fields()[6].schema(), other.metadata); + fieldSetFlags()[6] = true; + } + } + + /** Gets the value of the 'address1' field */ + public java.lang.CharSequence getAddress1() { + return address1; + } + + /** Sets the value of the 'address1' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder setAddress1(java.lang.CharSequence value) { + validate(fields()[0], value); + this.address1 = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'address1' field has been set */ + public boolean hasAddress1() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'address1' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder clearAddress1() { + address1 = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'address2' field */ + public java.lang.CharSequence getAddress2() { + return address2; + } + + /** Sets the value of the 'address2' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder setAddress2(java.lang.CharSequence value) { + validate(fields()[1], value); + this.address2 = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'address2' field has been set */ + public boolean hasAddress2() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'address2' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder clearAddress2() { + address2 = null; + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'city' field */ + public java.lang.CharSequence getCity() { + return city; + } + + /** Sets the value of the 'city' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder setCity(java.lang.CharSequence value) { + validate(fields()[2], value); + this.city = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'city' field has been set */ + public boolean hasCity() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'city' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder clearCity() { + city = null; + fieldSetFlags()[2] = false; + return this; + } + + /** Gets the value of the 'zipcode' field */ + public java.lang.Long getZipcode() { + return zipcode; + } + + /** Sets the value of the 'zipcode' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder setZipcode(long value) { + validate(fields()[3], value); + this.zipcode = value; + fieldSetFlags()[3] = true; + return this; + } + + /** Checks whether the 'zipcode' field has been set */ + public boolean hasZipcode() { + return fieldSetFlags()[3]; + } + + /** Clears the value of the 'zipcode' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder clearZipcode() { + fieldSetFlags()[3] = false; + return this; + } + + /** Gets the value of the 'county' field */ + public java.lang.Object getCounty() { + return county; + } + + /** Sets the value of the 'county' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder setCounty(java.lang.Object value) { + validate(fields()[4], value); + this.county = value; + fieldSetFlags()[4] = true; + return this; + } + + /** Checks whether the 'county' field has been set */ + public boolean hasCounty() { + return fieldSetFlags()[4]; + } + + /** Clears the value of the 'county' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder clearCounty() { + county = null; + fieldSetFlags()[4] = false; + return this; + } + + /** Gets the value of the 'aliases' field */ + public java.util.List getAliases() { + return aliases; + } + + /** Sets the value of the 'aliases' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder setAliases(java.util.List value) { + validate(fields()[5], value); + this.aliases = value; + fieldSetFlags()[5] = true; + return this; + } + + /** Checks whether the 'aliases' field has been set */ + public boolean hasAliases() { + return fieldSetFlags()[5]; + } + + /** Clears the value of the 'aliases' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder clearAliases() { + aliases = null; + fieldSetFlags()[5] = false; + return this; + } + + /** Gets the value of the 'metadata' field */ + public java.util.Map getMetadata() { + return metadata; + } + + /** Sets the value of the 'metadata' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder setMetadata(java.util.Map value) { + validate(fields()[6], value); + this.metadata = value; + fieldSetFlags()[6] = true; + return this; + } + + /** Checks whether the 'metadata' field has been set */ + public boolean hasMetadata() { + return fieldSetFlags()[6]; + } + + /** Clears the value of the 'metadata' field */ + public org.apache.hadoop.hive.hbase.avro.Address.Builder clearMetadata() { + metadata = null; + fieldSetFlags()[6] = false; + return this; + } + + @Override + public Address build() { + try { + Address record = new Address(); + record.address1 = fieldSetFlags()[0] ? this.address1 : (java.lang.CharSequence) defaultValue(fields()[0]); + record.address2 = fieldSetFlags()[1] ? this.address2 : (java.lang.CharSequence) defaultValue(fields()[1]); + record.city = fieldSetFlags()[2] ? this.city : (java.lang.CharSequence) defaultValue(fields()[2]); + record.zipcode = fieldSetFlags()[3] ? this.zipcode : (java.lang.Long) defaultValue(fields()[3]); + record.county = fieldSetFlags()[4] ? this.county : (java.lang.Object) defaultValue(fields()[4]); + record.aliases = fieldSetFlags()[5] ? this.aliases : (java.util.List) defaultValue(fields()[5]); + record.metadata = fieldSetFlags()[6] ? this.metadata : (java.util.Map) defaultValue(fields()[6]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/ContactInfo.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/ContactInfo.java new file mode 100644 index 0000000..61dfc35 --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/ContactInfo.java @@ -0,0 +1,250 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class ContactInfo extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ContactInfo\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"fields\":[{\"name\":\"address\",\"type\":[{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"address1\",\"type\":\"string\"},{\"name\":\"address2\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"zipcode\",\"type\":\"long\"},{\"name\":\"county\",\"type\":[{\"type\":\"record\",\"name\":\"HomePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"OfficePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},\"string\",\"null\"]},{\"name\":\"aliases\",\"type\":[{\"type\":\"array\",\"items\":\"string\"},\"null\"]},{\"name\":\"metadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]}]}},\"null\"]},{\"name\":\"homePhone\",\"type\":\"HomePhone\"},{\"name\":\"officePhone\",\"type\":\"OfficePhone\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public java.util.List address; + @Deprecated public org.apache.hadoop.hive.hbase.avro.HomePhone homePhone; + @Deprecated public org.apache.hadoop.hive.hbase.avro.OfficePhone officePhone; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use {@link \#newBuilder()}. + */ + public ContactInfo() {} + + /** + * All-args constructor. + */ + public ContactInfo(java.util.List address, org.apache.hadoop.hive.hbase.avro.HomePhone homePhone, org.apache.hadoop.hive.hbase.avro.OfficePhone officePhone) { + this.address = address; + this.homePhone = homePhone; + this.officePhone = officePhone; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return address; + case 1: return homePhone; + case 2: return officePhone; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: address = (java.util.List)value$; break; + case 1: homePhone = (org.apache.hadoop.hive.hbase.avro.HomePhone)value$; break; + case 2: officePhone = (org.apache.hadoop.hive.hbase.avro.OfficePhone)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'address' field. + */ + public java.util.List getAddress() { + return address; + } + + /** + * Sets the value of the 'address' field. + * @param value the value to set. + */ + public void setAddress(java.util.List value) { + this.address = value; + } + + /** + * Gets the value of the 'homePhone' field. + */ + public org.apache.hadoop.hive.hbase.avro.HomePhone getHomePhone() { + return homePhone; + } + + /** + * Sets the value of the 'homePhone' field. + * @param value the value to set. + */ + public void setHomePhone(org.apache.hadoop.hive.hbase.avro.HomePhone value) { + this.homePhone = value; + } + + /** + * Gets the value of the 'officePhone' field. + */ + public org.apache.hadoop.hive.hbase.avro.OfficePhone getOfficePhone() { + return officePhone; + } + + /** + * Sets the value of the 'officePhone' field. + * @param value the value to set. + */ + public void setOfficePhone(org.apache.hadoop.hive.hbase.avro.OfficePhone value) { + this.officePhone = value; + } + + /** Creates a new ContactInfo RecordBuilder */ + public static org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder newBuilder() { + return new org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder(); + } + + /** Creates a new ContactInfo RecordBuilder by copying an existing Builder */ + public static org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder other) { + return new org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder(other); + } + + /** Creates a new ContactInfo RecordBuilder by copying an existing ContactInfo instance */ + public static org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.ContactInfo other) { + return new org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder(other); + } + + /** + * RecordBuilder for ContactInfo instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.util.List address; + private org.apache.hadoop.hive.hbase.avro.HomePhone homePhone; + private org.apache.hadoop.hive.hbase.avro.OfficePhone officePhone; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.hadoop.hive.hbase.avro.ContactInfo.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder other) { + super(other); + if (isValidValue(fields()[0], other.address)) { + this.address = data().deepCopy(fields()[0].schema(), other.address); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.homePhone)) { + this.homePhone = data().deepCopy(fields()[1].schema(), other.homePhone); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.officePhone)) { + this.officePhone = data().deepCopy(fields()[2].schema(), other.officePhone); + fieldSetFlags()[2] = true; + } + } + + /** Creates a Builder by copying an existing ContactInfo instance */ + private Builder(org.apache.hadoop.hive.hbase.avro.ContactInfo other) { + super(org.apache.hadoop.hive.hbase.avro.ContactInfo.SCHEMA$); + if (isValidValue(fields()[0], other.address)) { + this.address = data().deepCopy(fields()[0].schema(), other.address); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.homePhone)) { + this.homePhone = data().deepCopy(fields()[1].schema(), other.homePhone); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.officePhone)) { + this.officePhone = data().deepCopy(fields()[2].schema(), other.officePhone); + fieldSetFlags()[2] = true; + } + } + + /** Gets the value of the 'address' field */ + public java.util.List getAddress() { + return address; + } + + /** Sets the value of the 'address' field */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder setAddress(java.util.List value) { + validate(fields()[0], value); + this.address = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'address' field has been set */ + public boolean hasAddress() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'address' field */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder clearAddress() { + address = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'homePhone' field */ + public org.apache.hadoop.hive.hbase.avro.HomePhone getHomePhone() { + return homePhone; + } + + /** Sets the value of the 'homePhone' field */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder setHomePhone(org.apache.hadoop.hive.hbase.avro.HomePhone value) { + validate(fields()[1], value); + this.homePhone = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'homePhone' field has been set */ + public boolean hasHomePhone() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'homePhone' field */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder clearHomePhone() { + homePhone = null; + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'officePhone' field */ + public org.apache.hadoop.hive.hbase.avro.OfficePhone getOfficePhone() { + return officePhone; + } + + /** Sets the value of the 'officePhone' field */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder setOfficePhone(org.apache.hadoop.hive.hbase.avro.OfficePhone value) { + validate(fields()[2], value); + this.officePhone = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'officePhone' field has been set */ + public boolean hasOfficePhone() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'officePhone' field */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo.Builder clearOfficePhone() { + officePhone = null; + fieldSetFlags()[2] = false; + return this; + } + + @Override + public ContactInfo build() { + try { + ContactInfo record = new ContactInfo(); + record.address = fieldSetFlags()[0] ? this.address : (java.util.List) defaultValue(fields()[0]); + record.homePhone = fieldSetFlags()[1] ? this.homePhone : (org.apache.hadoop.hive.hbase.avro.HomePhone) defaultValue(fields()[1]); + record.officePhone = fieldSetFlags()[2] ? this.officePhone : (org.apache.hadoop.hive.hbase.avro.OfficePhone) defaultValue(fields()[2]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Employee.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Employee.java new file mode 100644 index 0000000..6f40e9d --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Employee.java @@ -0,0 +1,356 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class Employee extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"fields\":[{\"name\":\"employeeName\",\"type\":\"string\"},{\"name\":\"employeeID\",\"type\":\"long\"},{\"name\":\"age\",\"type\":\"long\"},{\"name\":\"gender\",\"type\":{\"type\":\"enum\",\"name\":\"Gender\",\"symbols\":[\"MALE\",\"FEMALE\"]}},{\"name\":\"contactInfo\",\"type\":{\"type\":\"record\",\"name\":\"ContactInfo\",\"fields\":[{\"name\":\"address\",\"type\":[{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"address1\",\"type\":\"string\"},{\"name\":\"address2\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"zipcode\",\"type\":\"long\"},{\"name\":\"county\",\"type\":[{\"type\":\"record\",\"name\":\"HomePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"OfficePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},\"string\",\"null\"]},{\"name\":\"aliases\",\"type\":[{\"type\":\"array\",\"items\":\"string\"},\"null\"]},{\"name\":\"metadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]}]}},\"null\"]},{\"name\":\"homePhone\",\"type\":\"HomePhone\"},{\"name\":\"officePhone\",\"type\":\"OfficePhone\"}]}}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public java.lang.CharSequence employeeName; + @Deprecated public long employeeID; + @Deprecated public long age; + @Deprecated public org.apache.hadoop.hive.hbase.avro.Gender gender; + @Deprecated public org.apache.hadoop.hive.hbase.avro.ContactInfo contactInfo; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use {@link \#newBuilder()}. + */ + public Employee() {} + + /** + * All-args constructor. + */ + public Employee(java.lang.CharSequence employeeName, java.lang.Long employeeID, java.lang.Long age, org.apache.hadoop.hive.hbase.avro.Gender gender, org.apache.hadoop.hive.hbase.avro.ContactInfo contactInfo) { + this.employeeName = employeeName; + this.employeeID = employeeID; + this.age = age; + this.gender = gender; + this.contactInfo = contactInfo; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return employeeName; + case 1: return employeeID; + case 2: return age; + case 3: return gender; + case 4: return contactInfo; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: employeeName = (java.lang.CharSequence)value$; break; + case 1: employeeID = (java.lang.Long)value$; break; + case 2: age = (java.lang.Long)value$; break; + case 3: gender = (org.apache.hadoop.hive.hbase.avro.Gender)value$; break; + case 4: contactInfo = (org.apache.hadoop.hive.hbase.avro.ContactInfo)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'employeeName' field. + */ + public java.lang.CharSequence getEmployeeName() { + return employeeName; + } + + /** + * Sets the value of the 'employeeName' field. + * @param value the value to set. + */ + public void setEmployeeName(java.lang.CharSequence value) { + this.employeeName = value; + } + + /** + * Gets the value of the 'employeeID' field. + */ + public java.lang.Long getEmployeeID() { + return employeeID; + } + + /** + * Sets the value of the 'employeeID' field. + * @param value the value to set. + */ + public void setEmployeeID(java.lang.Long value) { + this.employeeID = value; + } + + /** + * Gets the value of the 'age' field. + */ + public java.lang.Long getAge() { + return age; + } + + /** + * Sets the value of the 'age' field. + * @param value the value to set. + */ + public void setAge(java.lang.Long value) { + this.age = value; + } + + /** + * Gets the value of the 'gender' field. + */ + public org.apache.hadoop.hive.hbase.avro.Gender getGender() { + return gender; + } + + /** + * Sets the value of the 'gender' field. + * @param value the value to set. + */ + public void setGender(org.apache.hadoop.hive.hbase.avro.Gender value) { + this.gender = value; + } + + /** + * Gets the value of the 'contactInfo' field. + */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo getContactInfo() { + return contactInfo; + } + + /** + * Sets the value of the 'contactInfo' field. + * @param value the value to set. + */ + public void setContactInfo(org.apache.hadoop.hive.hbase.avro.ContactInfo value) { + this.contactInfo = value; + } + + /** Creates a new Employee RecordBuilder */ + public static org.apache.hadoop.hive.hbase.avro.Employee.Builder newBuilder() { + return new org.apache.hadoop.hive.hbase.avro.Employee.Builder(); + } + + /** Creates a new Employee RecordBuilder by copying an existing Builder */ + public static org.apache.hadoop.hive.hbase.avro.Employee.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.Employee.Builder other) { + return new org.apache.hadoop.hive.hbase.avro.Employee.Builder(other); + } + + /** Creates a new Employee RecordBuilder by copying an existing Employee instance */ + public static org.apache.hadoop.hive.hbase.avro.Employee.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.Employee other) { + return new org.apache.hadoop.hive.hbase.avro.Employee.Builder(other); + } + + /** + * RecordBuilder for Employee instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.lang.CharSequence employeeName; + private long employeeID; + private long age; + private org.apache.hadoop.hive.hbase.avro.Gender gender; + private org.apache.hadoop.hive.hbase.avro.ContactInfo contactInfo; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.hadoop.hive.hbase.avro.Employee.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.hadoop.hive.hbase.avro.Employee.Builder other) { + super(other); + if (isValidValue(fields()[0], other.employeeName)) { + this.employeeName = data().deepCopy(fields()[0].schema(), other.employeeName); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.employeeID)) { + this.employeeID = data().deepCopy(fields()[1].schema(), other.employeeID); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.age)) { + this.age = data().deepCopy(fields()[2].schema(), other.age); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.gender)) { + this.gender = data().deepCopy(fields()[3].schema(), other.gender); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.contactInfo)) { + this.contactInfo = data().deepCopy(fields()[4].schema(), other.contactInfo); + fieldSetFlags()[4] = true; + } + } + + /** Creates a Builder by copying an existing Employee instance */ + private Builder(org.apache.hadoop.hive.hbase.avro.Employee other) { + super(org.apache.hadoop.hive.hbase.avro.Employee.SCHEMA$); + if (isValidValue(fields()[0], other.employeeName)) { + this.employeeName = data().deepCopy(fields()[0].schema(), other.employeeName); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.employeeID)) { + this.employeeID = data().deepCopy(fields()[1].schema(), other.employeeID); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.age)) { + this.age = data().deepCopy(fields()[2].schema(), other.age); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.gender)) { + this.gender = data().deepCopy(fields()[3].schema(), other.gender); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.contactInfo)) { + this.contactInfo = data().deepCopy(fields()[4].schema(), other.contactInfo); + fieldSetFlags()[4] = true; + } + } + + /** Gets the value of the 'employeeName' field */ + public java.lang.CharSequence getEmployeeName() { + return employeeName; + } + + /** Sets the value of the 'employeeName' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder setEmployeeName(java.lang.CharSequence value) { + validate(fields()[0], value); + this.employeeName = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'employeeName' field has been set */ + public boolean hasEmployeeName() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'employeeName' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder clearEmployeeName() { + employeeName = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'employeeID' field */ + public java.lang.Long getEmployeeID() { + return employeeID; + } + + /** Sets the value of the 'employeeID' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder setEmployeeID(long value) { + validate(fields()[1], value); + this.employeeID = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'employeeID' field has been set */ + public boolean hasEmployeeID() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'employeeID' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder clearEmployeeID() { + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'age' field */ + public java.lang.Long getAge() { + return age; + } + + /** Sets the value of the 'age' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder setAge(long value) { + validate(fields()[2], value); + this.age = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'age' field has been set */ + public boolean hasAge() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'age' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder clearAge() { + fieldSetFlags()[2] = false; + return this; + } + + /** Gets the value of the 'gender' field */ + public org.apache.hadoop.hive.hbase.avro.Gender getGender() { + return gender; + } + + /** Sets the value of the 'gender' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder setGender(org.apache.hadoop.hive.hbase.avro.Gender value) { + validate(fields()[3], value); + this.gender = value; + fieldSetFlags()[3] = true; + return this; + } + + /** Checks whether the 'gender' field has been set */ + public boolean hasGender() { + return fieldSetFlags()[3]; + } + + /** Clears the value of the 'gender' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder clearGender() { + gender = null; + fieldSetFlags()[3] = false; + return this; + } + + /** Gets the value of the 'contactInfo' field */ + public org.apache.hadoop.hive.hbase.avro.ContactInfo getContactInfo() { + return contactInfo; + } + + /** Sets the value of the 'contactInfo' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder setContactInfo(org.apache.hadoop.hive.hbase.avro.ContactInfo value) { + validate(fields()[4], value); + this.contactInfo = value; + fieldSetFlags()[4] = true; + return this; + } + + /** Checks whether the 'contactInfo' field has been set */ + public boolean hasContactInfo() { + return fieldSetFlags()[4]; + } + + /** Clears the value of the 'contactInfo' field */ + public org.apache.hadoop.hive.hbase.avro.Employee.Builder clearContactInfo() { + contactInfo = null; + fieldSetFlags()[4] = false; + return this; + } + + @Override + public Employee build() { + try { + Employee record = new Employee(); + record.employeeName = fieldSetFlags()[0] ? this.employeeName : (java.lang.CharSequence) defaultValue(fields()[0]); + record.employeeID = fieldSetFlags()[1] ? this.employeeID : (java.lang.Long) defaultValue(fields()[1]); + record.age = fieldSetFlags()[2] ? this.age : (java.lang.Long) defaultValue(fields()[2]); + record.gender = fieldSetFlags()[3] ? this.gender : (org.apache.hadoop.hive.hbase.avro.Gender) defaultValue(fields()[3]); + record.contactInfo = fieldSetFlags()[4] ? this.contactInfo : (org.apache.hadoop.hive.hbase.avro.ContactInfo) defaultValue(fields()[4]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/EmployeeAvro.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/EmployeeAvro.java new file mode 100644 index 0000000..1d4f966 --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/EmployeeAvro.java @@ -0,0 +1,17 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; + +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public interface EmployeeAvro { + public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"EmployeeAvro\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"types\":[{\"type\":\"enum\",\"name\":\"Gender\",\"symbols\":[\"MALE\",\"FEMALE\"]},{\"type\":\"record\",\"name\":\"HomePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"OfficePhone\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"address1\",\"type\":\"string\"},{\"name\":\"address2\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"zipcode\",\"type\":\"long\"},{\"name\":\"county\",\"type\":[\"HomePhone\",\"OfficePhone\",\"string\",\"null\"]},{\"name\":\"aliases\",\"type\":[{\"type\":\"array\",\"items\":\"string\"},\"null\"]},{\"name\":\"metadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]}]},{\"type\":\"record\",\"name\":\"ContactInfo\",\"fields\":[{\"name\":\"address\",\"type\":[{\"type\":\"array\",\"items\":\"Address\"},\"null\"]},{\"name\":\"homePhone\",\"type\":\"HomePhone\"},{\"name\":\"officePhone\",\"type\":\"OfficePhone\"}]},{\"type\":\"record\",\"name\":\"Employee\",\"fields\":[{\"name\":\"employeeName\",\"type\":\"string\"},{\"name\":\"employeeID\",\"type\":\"long\"},{\"name\":\"age\",\"type\":\"long\"},{\"name\":\"gender\",\"type\":\"Gender\"},{\"name\":\"contactInfo\",\"type\":\"ContactInfo\"}]}],\"messages\":{}}"); + + @SuppressWarnings("all") + public interface Callback extends EmployeeAvro { + public static final org.apache.avro.Protocol PROTOCOL = org.apache.hadoop.hive.hbase.avro.EmployeeAvro.PROTOCOL; + } +} diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Gender.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Gender.java new file mode 100644 index 0000000..ed2394b --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Gender.java @@ -0,0 +1,13 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public enum Gender { + MALE, FEMALE ; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Gender\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"symbols\":[\"MALE\",\"FEMALE\"]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } +} diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/HomePhone.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/HomePhone.java new file mode 100644 index 0000000..bb14e5e --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/HomePhone.java @@ -0,0 +1,194 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class HomePhone extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"HomePhone\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public long areaCode; + @Deprecated public long number; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use {@link \#newBuilder()}. + */ + public HomePhone() {} + + /** + * All-args constructor. + */ + public HomePhone(java.lang.Long areaCode, java.lang.Long number) { + this.areaCode = areaCode; + this.number = number; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return areaCode; + case 1: return number; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: areaCode = (java.lang.Long)value$; break; + case 1: number = (java.lang.Long)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'areaCode' field. + */ + public java.lang.Long getAreaCode() { + return areaCode; + } + + /** + * Sets the value of the 'areaCode' field. + * @param value the value to set. + */ + public void setAreaCode(java.lang.Long value) { + this.areaCode = value; + } + + /** + * Gets the value of the 'number' field. + */ + public java.lang.Long getNumber() { + return number; + } + + /** + * Sets the value of the 'number' field. + * @param value the value to set. + */ + public void setNumber(java.lang.Long value) { + this.number = value; + } + + /** Creates a new HomePhone RecordBuilder */ + public static org.apache.hadoop.hive.hbase.avro.HomePhone.Builder newBuilder() { + return new org.apache.hadoop.hive.hbase.avro.HomePhone.Builder(); + } + + /** Creates a new HomePhone RecordBuilder by copying an existing Builder */ + public static org.apache.hadoop.hive.hbase.avro.HomePhone.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.HomePhone.Builder other) { + return new org.apache.hadoop.hive.hbase.avro.HomePhone.Builder(other); + } + + /** Creates a new HomePhone RecordBuilder by copying an existing HomePhone instance */ + public static org.apache.hadoop.hive.hbase.avro.HomePhone.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.HomePhone other) { + return new org.apache.hadoop.hive.hbase.avro.HomePhone.Builder(other); + } + + /** + * RecordBuilder for HomePhone instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private long areaCode; + private long number; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.hadoop.hive.hbase.avro.HomePhone.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.hadoop.hive.hbase.avro.HomePhone.Builder other) { + super(other); + if (isValidValue(fields()[0], other.areaCode)) { + this.areaCode = data().deepCopy(fields()[0].schema(), other.areaCode); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.number)) { + this.number = data().deepCopy(fields()[1].schema(), other.number); + fieldSetFlags()[1] = true; + } + } + + /** Creates a Builder by copying an existing HomePhone instance */ + private Builder(org.apache.hadoop.hive.hbase.avro.HomePhone other) { + super(org.apache.hadoop.hive.hbase.avro.HomePhone.SCHEMA$); + if (isValidValue(fields()[0], other.areaCode)) { + this.areaCode = data().deepCopy(fields()[0].schema(), other.areaCode); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.number)) { + this.number = data().deepCopy(fields()[1].schema(), other.number); + fieldSetFlags()[1] = true; + } + } + + /** Gets the value of the 'areaCode' field */ + public java.lang.Long getAreaCode() { + return areaCode; + } + + /** Sets the value of the 'areaCode' field */ + public org.apache.hadoop.hive.hbase.avro.HomePhone.Builder setAreaCode(long value) { + validate(fields()[0], value); + this.areaCode = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'areaCode' field has been set */ + public boolean hasAreaCode() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'areaCode' field */ + public org.apache.hadoop.hive.hbase.avro.HomePhone.Builder clearAreaCode() { + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'number' field */ + public java.lang.Long getNumber() { + return number; + } + + /** Sets the value of the 'number' field */ + public org.apache.hadoop.hive.hbase.avro.HomePhone.Builder setNumber(long value) { + validate(fields()[1], value); + this.number = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'number' field has been set */ + public boolean hasNumber() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'number' field */ + public org.apache.hadoop.hive.hbase.avro.HomePhone.Builder clearNumber() { + fieldSetFlags()[1] = false; + return this; + } + + @Override + public HomePhone build() { + try { + HomePhone record = new HomePhone(); + record.areaCode = fieldSetFlags()[0] ? this.areaCode : (java.lang.Long) defaultValue(fields()[0]); + record.number = fieldSetFlags()[1] ? this.number : (java.lang.Long) defaultValue(fields()[1]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Magic.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Magic.java new file mode 100644 index 0000000..695bdd9 --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/Magic.java @@ -0,0 +1,23 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.FixedSize(4) +@org.apache.avro.specific.AvroGenerated +public class Magic extends org.apache.avro.specific.SpecificFixed { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"fixed\",\"name\":\"Magic\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"size\":4}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + /** Creates a new Magic */ + public Magic() { + super(); + } + + /** Creates a new Magic with the given bytes */ + public Magic(byte[] bytes) { + super(bytes); + } +} diff --git hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/OfficePhone.java hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/OfficePhone.java new file mode 100644 index 0000000..3f7b769 --- /dev/null +++ hbase-handler/src/gen/avro/gen-java/org/apache/hadoop/hive/hbase/avro/OfficePhone.java @@ -0,0 +1,194 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.hbase.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class OfficePhone extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"OfficePhone\",\"namespace\":\"org.apache.hadoop.hive.hbase.avro\",\"fields\":[{\"name\":\"areaCode\",\"type\":\"long\"},{\"name\":\"number\",\"type\":\"long\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public long areaCode; + @Deprecated public long number; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use {@link \#newBuilder()}. + */ + public OfficePhone() {} + + /** + * All-args constructor. + */ + public OfficePhone(java.lang.Long areaCode, java.lang.Long number) { + this.areaCode = areaCode; + this.number = number; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return areaCode; + case 1: return number; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: areaCode = (java.lang.Long)value$; break; + case 1: number = (java.lang.Long)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'areaCode' field. + */ + public java.lang.Long getAreaCode() { + return areaCode; + } + + /** + * Sets the value of the 'areaCode' field. + * @param value the value to set. + */ + public void setAreaCode(java.lang.Long value) { + this.areaCode = value; + } + + /** + * Gets the value of the 'number' field. + */ + public java.lang.Long getNumber() { + return number; + } + + /** + * Sets the value of the 'number' field. + * @param value the value to set. + */ + public void setNumber(java.lang.Long value) { + this.number = value; + } + + /** Creates a new OfficePhone RecordBuilder */ + public static org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder newBuilder() { + return new org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder(); + } + + /** Creates a new OfficePhone RecordBuilder by copying an existing Builder */ + public static org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder other) { + return new org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder(other); + } + + /** Creates a new OfficePhone RecordBuilder by copying an existing OfficePhone instance */ + public static org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder newBuilder(org.apache.hadoop.hive.hbase.avro.OfficePhone other) { + return new org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder(other); + } + + /** + * RecordBuilder for OfficePhone instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private long areaCode; + private long number; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.hadoop.hive.hbase.avro.OfficePhone.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder other) { + super(other); + if (isValidValue(fields()[0], other.areaCode)) { + this.areaCode = data().deepCopy(fields()[0].schema(), other.areaCode); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.number)) { + this.number = data().deepCopy(fields()[1].schema(), other.number); + fieldSetFlags()[1] = true; + } + } + + /** Creates a Builder by copying an existing OfficePhone instance */ + private Builder(org.apache.hadoop.hive.hbase.avro.OfficePhone other) { + super(org.apache.hadoop.hive.hbase.avro.OfficePhone.SCHEMA$); + if (isValidValue(fields()[0], other.areaCode)) { + this.areaCode = data().deepCopy(fields()[0].schema(), other.areaCode); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.number)) { + this.number = data().deepCopy(fields()[1].schema(), other.number); + fieldSetFlags()[1] = true; + } + } + + /** Gets the value of the 'areaCode' field */ + public java.lang.Long getAreaCode() { + return areaCode; + } + + /** Sets the value of the 'areaCode' field */ + public org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder setAreaCode(long value) { + validate(fields()[0], value); + this.areaCode = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'areaCode' field has been set */ + public boolean hasAreaCode() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'areaCode' field */ + public org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder clearAreaCode() { + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'number' field */ + public java.lang.Long getNumber() { + return number; + } + + /** Sets the value of the 'number' field */ + public org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder setNumber(long value) { + validate(fields()[1], value); + this.number = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'number' field has been set */ + public boolean hasNumber() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'number' field */ + public org.apache.hadoop.hive.hbase.avro.OfficePhone.Builder clearNumber() { + fieldSetFlags()[1] = false; + return this; + } + + @Override + public OfficePhone build() { + try { + OfficePhone record = new OfficePhone(); + record.areaCode = fieldSetFlags()[0] ? this.areaCode : (java.lang.Long) defaultValue(fields()[0]); + record.number = fieldSetFlags()[1] ? this.number : (java.lang.Long) defaultValue(fields()[1]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git hbase-handler/src/if/avro/avro_test.avpr hbase-handler/src/if/avro/avro_test.avpr new file mode 100644 index 0000000..86f7fce --- /dev/null +++ hbase-handler/src/if/avro/avro_test.avpr @@ -0,0 +1,144 @@ +{ +"protocol": "EmployeeAvro", +"namespace": "org.apache.hadoop.hive.hbase.avro", +"types": [ +{ +"type": "enum", +"name": "Gender", +"symbols": [ +"MALE", +"FEMALE" +] +}, +{ +"type": "record", +"name": "HomePhone", +"fields": [ +{ +"name": "areaCode", +"type": "long" +}, +{ +"name": "number", +"type": "long" +} +] +}, +{ +"type": "record", +"name": "OfficePhone", +"fields": [ +{ +"name": "areaCode", +"type": "long" +}, +{ +"name": "number", +"type": "long" +} +] +}, +{ +"type": "record", +"name": "Address", +"fields": [ +{ +"name": "address1", +"type": "string" +}, +{ +"name": "address2", +"type": "string" +}, +{ +"name": "city", +"type": "string" +}, +{ +"name": "zipcode", +"type": "long" +}, +{ +"name": "county", +"type": [ +"HomePhone", +"OfficePhone", +"string", +"null" +] +}, +{ +"name": "aliases", +"type": [ +{ +"type": "array", +"items": "string" +}, +"null" +] +}, +{ +"name": "metadata", +"type": [ +"null", +{ +"type": "map", +"values": "string" +} +] +} +] +}, +{ +"type": "record", +"name": "ContactInfo", +"fields": [ +{ +"name": "address", +"type": [ +{ +"type": "array", +"items": "Address" +}, +"null" +] +}, +{ +"name": "homePhone", +"type": "HomePhone" +}, +{ +"name": "officePhone", +"type": "OfficePhone" +} +] +}, +{ +"type": "record", +"name": "Employee", +"fields": [ +{ +"name": "employeeName", +"type": "string" +}, +{ +"name": "employeeID", +"type": "long" +}, +{ +"name": "age", +"type": "long" +}, +{ +"name": "gender", +"type": "Gender" +}, +{ +"name": "contactInfo", +"type": "ContactInfo" +} +] +} +], +"messages": { } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java index 5008f15..d448292 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hive.hbase; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Properties; +import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObject; @@ -42,16 +42,16 @@ * for the above example, the value returned for getField(0) should be part1, * getField(1) should be part2 and getField(2) should be part3. *

- * + * *

* All custom implementation are expected to have a constructor of the form: - * + * *

  * MyCustomCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, Configuration conf)
  * 
* *

- * + * * */ public class HBaseCompositeKey extends LazyStruct { @@ -73,8 +73,18 @@ public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { } /** + * Return the different parts of the key. By default, this returns an empty map. + * Consumers can choose to override this to provide their own names and types of parts of the key. + * + * @return map of parts name to their type + * */ + public Map getParts() { + return Collections.emptyMap(); + } + + /** * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID. - * + * * @param fieldID * field for which the object is to be created * @param bytes diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 2cd65cb..88da359 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -21,17 +21,27 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.reflect.ReflectData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.avro.AvroLazyObjectInspector; +import org.apache.hadoop.hive.serde2.avro.AvroSchemaRetriever; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; @@ -41,6 +51,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -48,11 +59,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; /** @@ -66,7 +72,9 @@ public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type"; public static final String HBASE_KEY_COL = ":key"; public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp"; + public static final String HBASE_AUTOGENERATE_STRUCT = "hbase.struct.autogenerate"; public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; + public static final String HBASE_COMPOSITE_KEY_TYPES = "hbase.composite.key.types"; public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; @@ -75,6 +83,15 @@ * WARNING: Note that currently this only supports the suffix wildcard .* **/ public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching"; + /** + * Defines the type for a column. A null value falls to the default, java**/ + public static final String SERIALIZATION_TYPE = "serialization.type"; + + /** + * AVRO serialization type + * */ + public static final String AVRO_SERIALIZATION_TYPE = "avro"; + public static final Log LOG = LogFactory.getLog(HBaseSerDe.class); private ObjectInspector cachedObjectInspector; @@ -89,6 +106,8 @@ private long putTimestamp; private Class compositeKeyClass; private Object compositeKeyObj; + private Class avroSchemaRetrieverClass; + private AvroSchemaRetriever avroSchemaRetriever; // used for serializing a field private byte [] separators; // the separators array @@ -97,6 +116,17 @@ private boolean [] needsEscape; // which chars need to be escaped. This array should have size // of 128. Negative byte values (or byte values >= 128) are // never escaped. + + /** + * Map from a column to its corresponding data type i.e JAVA or AVRO + * */ + private final Map columnToDataType = new HashMap(); + + /** + * Map from a column to its corresponding schema + * */ + private final Map avroColumnToSchema = new HashMap(); + @Override public String toString() { return getClass().toString() @@ -121,8 +151,37 @@ public HBaseSerDe() throws SerDeException { public void initialize(Configuration conf, Properties tbl) throws SerDeException { + boolean hasSerializedStruct = false; + initHBaseSerDeParameters(conf, tbl, getClass().getName()); + // Read all columns to get their corresponding deserializers + String columns[] = tbl.getProperty(serdeConstants.LIST_COLUMNS).split(","); + + if (columns.length != columnsMapping.size()) { + throw new SerDeException("Number of columns do not match column mapping"); + } + + try { + for (int i = 0; i < columnsMapping.size(); i++) { + + ColumnProperties colProps = validateAndSetConf(conf, tbl, columnsMapping.get(i)); + + if (colProps.getType() != ObjectInspectorOptions.JAVA) { + // we have found a struct that is either a protobuf, thrift or avro + hasSerializedStruct = true; + } + + columnToDataType.put(columns[i], colProps.getType()); + + if (colProps.getType() == ObjectInspectorOptions.AVRO) { + avroColumnToSchema.put(columns[i], colProps.getSchema()); + } + } + } catch (Exception e) { + throw new SerDeException(e); + } + cachedObjectInspector = LazyFactory.createLazyStructInspector( serdeParams.getColumnNames(), serdeParams.getColumnTypes(), @@ -130,7 +189,7 @@ public void initialize(Configuration conf, Properties tbl) serdeParams.getNullSequence(), serdeParams.isLastColumnTakesRest(), serdeParams.isEscaped(), - serdeParams.getEscapeChar()); + serdeParams.getEscapeChar(), columnToDataType); cachedHBaseRow = new LazyHBaseRow( (LazySimpleStructObjectInspector) cachedObjectInspector); @@ -140,6 +199,16 @@ public void initialize(Configuration conf, Properties tbl) initCompositeKeyClass(conf,tbl); } + if (avroSchemaRetrieverClass != null) { + // initialize the avro schema retriever class + initAvroSchemaRetriever(conf, tbl); + } + + if (hasSerializedStruct) { + // initialize the internal object inspectors if we have found a serialized struct + initInternalObjectInspectors(); + } + if (LOG.isDebugEnabled()) { LOG.debug("HBaseSerDe initialized with : columnNames = " + serdeParams.getColumnNames() @@ -464,12 +533,15 @@ private void initHBaseSerDeParameters( // Read configuration parameters hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING); + String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + String autogenerate = tbl.getProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT); putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1")); doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true")); String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + String avroSchemaRetClass = tbl.getProperty(AvroSerdeUtils.SCHEMA_RETRIEVER); if (compKeyClass != null) { try { @@ -479,33 +551,60 @@ private void initHBaseSerDeParameters( } } + if (avroSchemaRetClass != null) { + try { + avroSchemaRetrieverClass = job.getClassByName(avroSchemaRetClass); + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } + } + // Parse and initialize the HBase columns mapping columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); // Build the type property string if not supplied - if (columnTypeProperty == null) { - StringBuilder sb = new StringBuilder(); + if (columnTypeProperty == null || columnTypeProperty.isEmpty()) { + StringBuilder columns = new StringBuilder(); + StringBuilder columnTypes = new StringBuilder(); - for (int i = 0; i < columnsMapping.size(); i++) { - if (sb.length() > 0) { - sb.append(":"); + if (columnNameProperty == null || columnNameProperty.isEmpty()) { + + if (autogenerate == null || autogenerate.isEmpty()) { + throw new IllegalArgumentException("Either the columns must be specified or the " + HBaseSerDe.HBASE_AUTOGENERATE_STRUCT + + " property must be set to true."); } - ColumnMapping colMap = columnsMapping.get(i); + if (autogenerate.equals("true")) { + // auto generate columns + HBaseSerDeHelper.generateColumns(tbl, columnsMapping, columns); + // auto generate column types + HBaseSerDeHelper.generateColumnTypes(tbl, columnsMapping, columnTypes, job); + // set the columns back + tbl.setProperty(serdeConstants.LIST_COLUMNS, columns.toString()); + } + } else { + for (int i = 0; i < columnsMapping.size(); i++) { + if (columnTypes.length() > 0) { + columnTypes.append(":"); + } - if (colMap.hbaseRowKey) { - // the row key column becomes a STRING - sb.append(serdeConstants.STRING_TYPE_NAME); - } else if (colMap.qualifierName == null) { - // a column family become a MAP - sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + "," - + serdeConstants.STRING_TYPE_NAME + ">"); - } else { - // an individual column becomes a STRING - sb.append(serdeConstants.STRING_TYPE_NAME); + ColumnMapping colMap = columnsMapping.get(i); + + if (colMap.hbaseRowKey) { + // the row key column becomes a STRING + columnTypes.append(serdeConstants.STRING_TYPE_NAME); + } else if (colMap.qualifierName == null) { + // a column family become a MAP + columnTypes.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + "," + + serdeConstants.STRING_TYPE_NAME + ">"); + } else { + // an individual column becomes a STRING + columnTypes.append(serdeConstants.STRING_TYPE_NAME); + } } } - tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, sb.toString()); + + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); } serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); @@ -822,6 +921,11 @@ private boolean serialize( } return true; } + + case UNION: { + // union type currently not totally supported. See HIVE-2390 + return false; + } } throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); @@ -870,6 +974,110 @@ private void initCompositeKeyClass(Configuration conf,Properties tbl) throws Ser } /** + * Initialize the instance for {@link AvroSchemaRetriever} + * + * @throws SerDeException + * */ + private void initAvroSchemaRetriever(Configuration conf, Properties tbl) throws SerDeException { + + try { + avroSchemaRetriever = (AvroSchemaRetriever) avroSchemaRetrieverClass.getDeclaredConstructor( + Configuration.class, Properties.class).newInstance( + conf, tbl); + } catch (IllegalArgumentException e) { + throw new SerDeException(e); + } catch (SecurityException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InvocationTargetException e) { + throw new SerDeException(e); + } catch (NoSuchMethodException e) { + // the constructor wasn't defined in the implementation class. Flag error + throw new SerDeException("Constructor not defined in schema retriever class [" + avroSchemaRetrieverClass.getName() + "]", e); + } + } + + /** + * Initialize the internal object inspectors + * */ + private void initInternalObjectInspectors() { + List allFields = ((StructObjectInspector) cachedObjectInspector).getAllStructFieldRefs(); + + // A new object inspector object is created for each column and for all of its nested structs. + // For this reason, we need to recursively dig into the nested object inspectors and set the + // required properties + // on all of them so that each of them can perform the deserialization + // aptly. + for (StructField field : allFields) { + ObjectInspector oi = field.getFieldObjectInspector(); + + if (oi instanceof AvroLazyObjectInspector) { + initAvroObjectInspector(oi, field); + } else if (oi instanceof MapObjectInspector) { + // we found a map objectinspector. Grab the objectinspector for the value and initialize it + // aptly + ObjectInspector valueOI = ((MapObjectInspector) oi).getMapValueObjectInspector(); + + if (valueOI instanceof AvroLazyObjectInspector) { + initAvroObjectInspector(valueOI, field); + } + } + } + } + + /** + * Recursively initialize the avro object inspector and all its nested objectinspectors with the table schema + * + * @param oi + * ObjectInspector to be recursively initialized + * @param field + * initialize with the schema for the given field + * */ + private void initAvroObjectInspector(ObjectInspector oi, StructField field) { + Schema schema = avroColumnToSchema.get(field.getFieldName()); + initAvroObjectInspector(oi, schema); + } + + /** + * Recursively initialize the {@link AvroLazyObjectInspector} and all its nested ois + * + * @param oi + * ObjectInspector to be recursively initialized + * @param schema + * {@link Schema} to be initialized with + * @param schemaRetriever + * class to be used to retrieve schema + * */ + private void initAvroObjectInspector(ObjectInspector oi, Schema schema) { + // Check for a list. If found, recursively init its members + if (oi instanceof ListObjectInspector) { + ListObjectInspector loi = (ListObjectInspector) oi; + + initAvroObjectInspector(loi.getListElementObjectInspector(), schema); + return; + } + + // Check for a nested message. If found, set the schema, else return. + if (!(oi instanceof AvroLazyObjectInspector)) { + return; + } + + AvroLazyObjectInspector aoi = (AvroLazyObjectInspector) oi; + + aoi.setSchemaRetriever(avroSchemaRetriever); + aoi.setReaderSchema(schema); + + // call the method recursively over all the internal fields of the given avro + // objectinspector + for (StructField field : aoi.getAllStructFieldRefs()) { + initAvroObjectInspector(field.getFieldObjectInspector(), schema); + } + } + + /** * @return the useJSONSerialize */ public boolean isUseJSONSerialize() { @@ -918,4 +1126,149 @@ public static int getRowKeyColumnOffset(List columnsMapping) throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" + " row key column."); } + + /** + * Validate the configuration set for a given column mapping and return a {@link ColumnProperties} object defining the properties + * for that specific mapping + * + * @param conf + * the hbase configuration + * @param tbl + * properties for the table + * @param colMap + * given hbase column mapping + * @throws Exception + * if there was an error validating the configuration + * */ + private ColumnProperties validateAndSetConf(Configuration conf, Properties tbl, ColumnMapping colMap) throws Exception { + + String serType = null; + String serClassName = null; + String protocolName = null; + String schemaLiteral = null; + String schemaUrl = null; + + if (colMap.qualifierName == null) { + // only a column family + + if (colMap.qualifierPrefix != null) { + serType = tbl.getProperty(colMap.familyName + "." + colMap.qualifierPrefix + "." + SERIALIZATION_TYPE); + + serClassName = tbl.getProperty(colMap.familyName + "." + colMap.qualifierPrefix + "." + serdeConstants.SERIALIZATION_CLASS); + + protocolName = tbl + .getProperty(colMap.familyName + "." + colMap.qualifierPrefix + "." + serdeConstants.SERIALIZATION_FORMAT); + + schemaLiteral = tbl.getProperty(colMap.familyName + "." + colMap.qualifierPrefix + "." + AvroSerdeUtils.SCHEMA_LITERAL); + + schemaUrl = tbl.getProperty(colMap.familyName + "." + colMap.qualifierPrefix + "."+ AvroSerdeUtils.SCHEMA_URL); + } else { + serType = tbl.getProperty(colMap.familyName + "." + SERIALIZATION_TYPE); + + serClassName = tbl.getProperty(colMap.familyName + "." + serdeConstants.SERIALIZATION_CLASS); + + protocolName = tbl.getProperty(colMap.familyName + "." + serdeConstants.SERIALIZATION_FORMAT); + + schemaLiteral = tbl.getProperty(colMap.familyName + "." + AvroSerdeUtils.SCHEMA_LITERAL); + + schemaUrl = tbl.getProperty(colMap.familyName + "." + AvroSerdeUtils.SCHEMA_URL); + } + } else if (!colMap.hbaseRowKey) { + // not an hbase row key. This should either be a prefix or an individual qualifier + String qualifierName = colMap.qualifierName; + + if (colMap.qualifierName.endsWith("*")) { + qualifierName = colMap.qualifierName.substring(0, colMap.qualifierName.length() - 1); + } + + serType = tbl.getProperty(colMap.familyName + "." + qualifierName + "." + SERIALIZATION_TYPE); + + serClassName = tbl.getProperty(colMap.familyName + "." + qualifierName + "." + serdeConstants.SERIALIZATION_CLASS); + + protocolName = tbl.getProperty(colMap.familyName + "." + qualifierName + "." + serdeConstants.SERIALIZATION_FORMAT); + + schemaLiteral = tbl.getProperty(colMap.familyName + "." + qualifierName + "." + AvroSerdeUtils.SCHEMA_LITERAL); + + schemaUrl = tbl.getProperty(colMap.familyName + "." + qualifierName + "." + AvroSerdeUtils.SCHEMA_URL); + } + + ObjectInspectorOptions option = null; + + if (serType == null) { + if (serClassName != null || protocolName != null) { + throw new IllegalArgumentException("serialization.type was not set for column family [" + colMap.familyName + + "] and qualifier [" + colMap.qualifierName + "] but serialization.class or serialization.format was set"); + } + option = ObjectInspectorOptions.JAVA; + return new ColumnProperties(option, null, null); + } else { + if (serType.equalsIgnoreCase(AVRO_SERIALIZATION_TYPE)) { + + if (schemaLiteral == null && serClassName == null && schemaUrl == null + && avroSchemaRetrieverClass == null) { + throw new IllegalArgumentException("serialization.type was set to [" + serType + "] but neither " + + AvroSerdeUtils.SCHEMA_LITERAL + ", " + AvroSerdeUtils.SCHEMA_URL + + ", serialization.class or " + AvroSerdeUtils.SCHEMA_RETRIEVER + " property was set"); + } + + option = ObjectInspectorOptions.AVRO; + } else { + throw new IllegalArgumentException("Illegal serialization.type [" + serType + "] found for column family [" + + colMap.familyName + "] and qualifier [" + colMap.qualifierName + "]"); + } + } + + Class deserializerClass = null; + + if (serClassName != null) { + deserializerClass = conf.getClassByName(serClassName); + } + + Schema schema = null; + + // only worry about getting schema if we are dealing with Avro + if (serType.equalsIgnoreCase(AVRO_SERIALIZATION_TYPE)) { + + if (avroSchemaRetrieverClass == null) { + + // bother about generating a schema only if a schema retriever class wasn't provided + if (schemaLiteral != null) { + schema = Schema.parse(schemaLiteral); + } else if (schemaUrl != null) { + schema = HBaseSerDeHelper.getSchemaFromFS(schemaUrl, conf); + } else if (deserializerClass != null) { + schema = ReflectData.get().getSchema(deserializerClass); + } + } + } + + return new ColumnProperties(option, deserializerClass, schema); + } + + /** + * Class representing the properties for a single hive column + * */ + class ColumnProperties { + private final ObjectInspectorOptions type; + private final Class deserializerClass; + private final Schema schema; + + ColumnProperties(ObjectInspectorOptions type, Class deserializerClass, Schema schema) { + this.type = type; + this.deserializerClass = deserializerClass; + this.schema = schema; + } + + public ObjectInspectorOptions getType() { + return type; + } + + public Class getDeserializerClass() { + return deserializerClass; + } + + public Schema getSchema() { + return schema; + } + } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java new file mode 100644 index 0000000..c52ff24 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java @@ -0,0 +1,625 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.hbase; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.avro.reflect.ReflectData; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.util.StringUtils; + +/** + * Helper class for {@link HBaseSerDe} + * */ +public class HBaseSerDeHelper { + + /** + * Logger + * */ + public static final Log LOG = LogFactory.getLog(HBaseSerDeHelper.class); + + /** + * Autogenerates the columns from the given serialization class + * + * @param tbl + * the hive table properties + * @param columnsMapping + * the hbase columns mapping determining hbase column families and + * qualifiers + * @param sb + * StringBuilder to form the list of columns + * @throws IllegalArgumentException + * if any of the given arguments was null + * */ + public static void generateColumns(Properties tbl, + List columnsMapping, StringBuilder sb) { + // Generate the columns according to the column mapping provided + // Note: The generated column names are same as the + // family_name.qualifier_name. If the qualifier + // name is null, each column is familyname_col[i] where i is the index of + // the column ranging + // from 0 to n-1 where n is the size of the column mapping. The filter + // function removes any + // special characters other than alphabets and numbers from the column + // family and qualifier name + // as the only special character allowed in a column name is "_" which is + // used as a separator + // between the column family and qualifier name. + + if (columnsMapping == null) { + throw new IllegalArgumentException("columnsMapping cannot be null"); + } + + if (sb == null) { + throw new IllegalArgumentException("StringBuilder cannot be null"); + } + + for (int i = 0; i < columnsMapping.size(); i++) { + ColumnMapping colMap = columnsMapping.get(i); + + if (colMap.hbaseRowKey) { + sb.append("key").append(StringUtils.COMMA_STR); + } else if (colMap.qualifierName == null) { + // this corresponds to a map + + if (colMap.qualifierPrefix != null) { + sb.append(filter(colMap.familyName)).append("_") + .append(filter(colMap.qualifierPrefix) + i) + .append(StringUtils.COMMA_STR); + } else { + sb.append(filter(colMap.familyName)).append("_").append("col" + i) + .append(StringUtils.COMMA_STR); + } + } else { + // just an individual column + sb.append(filter(colMap.familyName)).append("_") + .append(filter(colMap.qualifierName)).append(StringUtils.COMMA_STR); + } + } + + // trim off the ending ",", if any + trim(sb); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated columns: [" + sb.toString() + "]"); + } + } + + /** + * Autogenerates the column types from the given serialization class + * + * @param tbl + * the hive table properties + * @param columnsMapping + * the hbase columns mapping determining hbase column families and + * qualifiers + * @param sb + * StringBuilder to form the list of columns + * @param conf + * configuration + * @throws IllegalArgumentException + * if any of the given arguments was null + * @throws SerDeException + * if there was an error generating the column types + * */ + public static void generateColumnTypes(Properties tbl, + List columnsMapping, StringBuilder sb, Configuration conf) + throws SerDeException { + + if (tbl == null) { + throw new IllegalArgumentException("tbl cannot be null"); + } + + if (columnsMapping == null) { + throw new IllegalArgumentException("columnsMapping cannot be null"); + } + + if (sb == null) { + throw new IllegalArgumentException("StringBuilder cannot be null"); + } + + // Generate the columns according to the column mapping provided + for (int i = 0; i < columnsMapping.size(); i++) { + if (sb.length() > 0) { + sb.append(":"); + } + + ColumnMapping colMap = columnsMapping.get(i); + + if (colMap.hbaseRowKey) { + + Map compositeKeyParts = getCompositeKeyParts(tbl); + StringBuilder keyStruct = new StringBuilder(); + + if (compositeKeyParts == null || compositeKeyParts.isEmpty()) { + String compKeyClass = tbl + .getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + String compKeyTypes = tbl + .getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_TYPES); + + if (compKeyTypes == null) { + + if (compKeyClass != null) { + // a composite key class was provided. But neither the types + // property was set and + // neither the getParts() method of HBaseCompositeKey was + // overidden in the + // implementation. Flag exception. + throw new SerDeException( + "Either the hbase.composite.key.types property should be set or the getParts method must be overridden in " + + compKeyClass); + } + + // the row key column becomes a STRING + sb.append(serdeConstants.STRING_TYPE_NAME); + } else { + generateKeyStruct(compKeyTypes, keyStruct); + } + } else { + generateKeyStruct(compositeKeyParts, keyStruct); + } + sb.append(keyStruct); + } else if (colMap.qualifierName == null) { + + String serClassName = null; + String serType = null; + String schemaLiteral = null; + String schemaUrl = null; + + if (colMap.qualifierPrefix != null) { + + serType = tbl.getProperty(colMap.familyName + "." + + colMap.qualifierPrefix + "." + HBaseSerDe.SERIALIZATION_TYPE); + + if (serType == null) { + throw new SerDeException(HBaseSerDe.SERIALIZATION_TYPE + + " property not provided for column family [" + + colMap.familyName + "] and prefix [" + colMap.qualifierPrefix + + "]"); + } + + // we are provided with a prefix + serClassName = tbl.getProperty(colMap.familyName + "." + + colMap.qualifierPrefix + "." + + serdeConstants.SERIALIZATION_CLASS); + + if (serClassName == null) { + if (serType.equalsIgnoreCase(HBaseSerDe.AVRO_SERIALIZATION_TYPE)) { + // for avro type, the serialization class parameter is optional + schemaLiteral = tbl.getProperty(colMap.familyName + "." + + colMap.qualifierPrefix + "." + + AvroSerdeUtils.SCHEMA_LITERAL); + schemaUrl = tbl.getProperty(colMap.familyName + "." + + colMap.qualifierPrefix + "." + AvroSerdeUtils.SCHEMA_URL); + + if (schemaLiteral == null && schemaUrl == null) { + // either schema literal, schema url or serialization class must + // be provided + throw new SerDeException("For an avro schema, either " + + AvroSerdeUtils.SCHEMA_LITERAL + ", " + + AvroSerdeUtils.SCHEMA_URL + " or " + + serdeConstants.SERIALIZATION_CLASS + + " property must be set."); + } + + if (schemaUrl != null) { + schemaLiteral = getSchemaFromFS(schemaUrl, conf).toString(); + } + + } else { + throw new SerDeException(serdeConstants.SERIALIZATION_CLASS + + " property not provided for column family [" + + colMap.familyName + "] and prefix [" + + colMap.qualifierPrefix + "]"); + } + } + } else { + serType = tbl.getProperty(colMap.familyName + "." + + HBaseSerDe.SERIALIZATION_TYPE); + + if (serType == null) { + throw new SerDeException(HBaseSerDe.SERIALIZATION_TYPE + + " property not provided for column family [" + + colMap.familyName + "]"); + } + + serClassName = tbl.getProperty(colMap.familyName + "." + + serdeConstants.SERIALIZATION_CLASS); + + if (serClassName == null) { + + if (serType.equalsIgnoreCase(HBaseSerDe.AVRO_SERIALIZATION_TYPE)) { + // for avro type, the serialization class parameter is optional + schemaLiteral = tbl.getProperty(colMap.familyName + "." + + AvroSerdeUtils.SCHEMA_LITERAL); + schemaUrl = tbl.getProperty(colMap.familyName + "." + + AvroSerdeUtils.SCHEMA_URL); + + if (schemaLiteral == null && schemaUrl == null) { + // either schema literal or serialization class must be provided + throw new SerDeException("For an avro schema, either " + + AvroSerdeUtils.SCHEMA_LITERAL + " property or " + + serdeConstants.SERIALIZATION_CLASS + + " property must be set."); + } + + if (schemaUrl != null) { + schemaLiteral = getSchemaFromFS(schemaUrl, conf).toString(); + } + } else { + throw new SerDeException(serdeConstants.SERIALIZATION_CLASS + + " property not provided for column family [" + + colMap.familyName + "]"); + } + } + } + + StringBuilder generatedStruct = new StringBuilder(); + + // generate struct for each of the given prefixes + generateColumnStruct(serType, serClassName, schemaLiteral, colMap, + generatedStruct); + + // a column family becomes a MAP + sb.append(serdeConstants.MAP_TYPE_NAME + "<" + + serdeConstants.STRING_TYPE_NAME + "," + generatedStruct + ">"); + + } else { + + String qualifierName = colMap.qualifierName; + + if (colMap.qualifierName.endsWith("*")) { + // we are provided with a prefix + qualifierName = colMap.qualifierName.substring(0, + colMap.qualifierName.length() - 1); + } + + String serType = tbl.getProperty(colMap.familyName + "." + + qualifierName + "." + HBaseSerDe.SERIALIZATION_TYPE); + + if (serType == null) { + throw new SerDeException(HBaseSerDe.SERIALIZATION_TYPE + + " property not provided for column family [" + + colMap.familyName + "] and qualifier [" + qualifierName + "]"); + } + + String serClassName = tbl.getProperty(colMap.familyName + "." + + qualifierName + "." + serdeConstants.SERIALIZATION_CLASS); + + String schemaLiteral = null; + String schemaUrl = null; + + if (serClassName == null) { + + if (serType.equalsIgnoreCase(HBaseSerDe.AVRO_SERIALIZATION_TYPE)) { + // for avro type, the serialization class parameter is optional + schemaLiteral = tbl.getProperty(colMap.familyName + "." + + qualifierName + "." + AvroSerdeUtils.SCHEMA_LITERAL); + schemaUrl = tbl.getProperty(colMap.familyName + "." + qualifierName + + "." + AvroSerdeUtils.SCHEMA_URL); + + if (schemaLiteral == null && schemaUrl == null) { + // either schema literal, schema url or serialization class must + // be provided + throw new SerDeException("For an avro schema, either " + + AvroSerdeUtils.SCHEMA_LITERAL + ", " + + AvroSerdeUtils.SCHEMA_URL + " or " + + serdeConstants.SERIALIZATION_CLASS + + " property must be set."); + } + + if (schemaUrl != null) { + schemaLiteral = getSchemaFromFS(schemaUrl, conf).toString(); + } + } else { + throw new SerDeException(serdeConstants.SERIALIZATION_CLASS + + " property not provided for column family [" + + colMap.familyName + "] and qualifier [" + qualifierName + "]"); + } + } + + StringBuilder generatedStruct = new StringBuilder(); + + generateColumnStruct(serType, serClassName, schemaLiteral, colMap, + generatedStruct); + + sb.append(generatedStruct); + } + } + + // trim off ending ",", if any + trim(sb); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated column types: [" + sb.toString() + "]"); + } + } + + /** + * Read the schema from the given hdfs url for the schema + * */ + public static Schema getSchemaFromFS(String schemaFSUrl, Configuration conf) + throws SerDeException { + FSDataInputStream in = null; + FileSystem fs = null; + try { + fs = FileSystem.get(new URI(schemaFSUrl), conf); + in = fs.open(new Path(schemaFSUrl)); + Schema s = Schema.parse(in); + return s; + } catch (URISyntaxException e) { + throw new SerDeException("Failure reading schema from filesystem", e); + } catch (IOException e) { + throw new SerDeException("Failure reading schema from filesystem", e); + } finally { + IOUtils.closeQuietly(in); + } + } + + /** + * Auto-generates the key struct for composite keys + * + * @param compositeKeyParts + * map of composite key part name to its type. Usually this would be + * provided by the custom implementation of {@link HBaseCompositeKey + * composite key} + * @param sb + * StringBuilder object to construct the struct + * */ + private static void generateKeyStruct(Map compositeKeyParts, + StringBuilder sb) { + sb.append("struct<"); + + for (Entry entry : compositeKeyParts.entrySet()) { + sb.append(entry.getKey()).append(":").append(entry.getValue()) + .append(","); + } + + // trim the trailing "," + trim(sb); + sb.append(">"); + } + + /** + * Auto-generates the key struct for composite keys + * + * @param compositeKeyTypes + * comma separated list of composite key types in order + * @param sb + * StringBuilder object to construct the struct + * */ + private static void generateKeyStruct(String compositeKeyTypes, + StringBuilder sb) { + sb.append("struct<"); + + // composite key types is a comma separated list of different parts of the + // composite keys in + // order in which they appear in the key + String[] keyTypes = compositeKeyTypes.split(","); + + for (int i = 0; i < keyTypes.length; i++) { + sb.append("col" + i).append(":").append(keyTypes[i]) + .append(StringUtils.COMMA_STR); + } + + // trim the trailing "," + trim(sb); + sb.append(">"); + } + + /** + * Auto-generates the protobuf/thrift/avro struct + * + * @param serType + * serialization type + * @param serClassName + * serialization class name + * @param schemaLiteral + * schema string + * @param colMap + * hbase column mapping + * @param sb + * StringBuilder to hold the generated struct + * @throws SerDeException + * if something goes wrong while generating the struct + * */ + private static void generateColumnStruct(String serType, String serClassName, + String schemaLiteral, ColumnMapping colMap, StringBuilder sb) + throws SerDeException { + + if (serType.equalsIgnoreCase(HBaseSerDe.AVRO_SERIALIZATION_TYPE)) { + + if (serClassName != null) { + generateAvroStructFromClass(serClassName, sb); + } else { + generateAvroStructFromSchema(schemaLiteral, sb); + } + } else { + throw new SerDeException("Unknown " + HBaseSerDe.SERIALIZATION_TYPE + + " found for column family [" + colMap.familyName + "]"); + } + } + + /** + * Auto-generate the avro struct from class + * + * @param serClassName + * serialization class for avro struct + * @param sb + * StringBuilder to hold the generated struct + * @throws SerDeException + * if something goes wrong while generating the struct + * */ + private static void generateAvroStructFromClass(String serClassName, + StringBuilder sb) throws SerDeException { + Class serClass; + try { + serClass = Class.forName(serClassName); + } catch (ClassNotFoundException e) { + throw new SerDeException( + "Error obtaining descriptor for " + serClassName, e); + } + + Schema schema = ReflectData.get().getSchema(serClass); + + generateAvroStructFromSchema(schema, sb); + } + + /** + * Auto-generate the avro struct from schema + * + * @param schemaLiteral + * schema for the avro struct as string + * @param sb + * StringBuilder to hold the generated struct + * @throws SerDeException + * if something goes wrong while generating the struct + * */ + private static void generateAvroStructFromSchema(String schemaLiteral, + StringBuilder sb) throws SerDeException { + Schema schema = Schema.parse(schemaLiteral); + + generateAvroStructFromSchema(schema, sb); + } + + /** + * Auto-generate the avro struct from schema + * + * @param schema + * schema for the avro struct + * @param sb + * StringBuilder to hold the generated struct + * @throws SerDeException + * if something goes wrong while generating the struct + * */ + private static void generateAvroStructFromSchema(Schema schema, + StringBuilder sb) throws SerDeException { + AvroObjectInspectorGenerator avig = new AvroObjectInspectorGenerator(schema); + + sb.append("struct<"); + + // Get the column names and their corresponding types + List columnNames = avig.getColumnNames(); + List columnTypes = avig.getColumnTypes(); + + if (columnNames.size() != columnTypes.size()) { + throw new AssertionError( + "The number of column names should be the same as column types"); + } + + for (int i = 0; i < columnNames.size(); i++) { + sb.append(columnNames.get(i)); + sb.append(":"); + sb.append(columnTypes.get(i).getTypeName()); + sb.append(","); + } + + trim(sb).append(">"); + } + + /** + * Trims by removing the trailing "," if any + * + * @param sb + * StringBuilder to trim + * @return StringBuilder trimmed StringBuilder + * */ + private static StringBuilder trim(StringBuilder sb) { + if (sb.charAt(sb.length() - 1) == StringUtils.COMMA) { + return sb.deleteCharAt(sb.length() - 1); + } + + return sb; + } + + /** + * Filters the given name by removing any special character and convert to + * lowercase + * */ + private static String filter(String name) { + return name.replaceAll("[^a-zA-Z0-9]+", "").toLowerCase(); + } + + /** + * Return the types for the composite key. + * + * @param tbl + * Properties for the table + * @return a comma-separated list of composite key types + * @throws SerDeException + * if something goes wrong while getting the composite key parts + * */ + @SuppressWarnings("unchecked") + private static Map getCompositeKeyParts(Properties tbl) + throws SerDeException { + String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + + Object types = null; + + if (compKeyClass == null) { + // no custom composite key class provided. return null + return null; + } + + try { + Class clazz = Class.forName(compKeyClass); + Object compositeKeyObj = clazz.getDeclaredConstructor( + LazySimpleStructObjectInspector.class, Properties.class, + Configuration.class).newInstance(null, tbl, null); + + types = clazz.getMethod("getParts").invoke(compositeKeyObj); + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } catch (IllegalArgumentException e) { + throw new SerDeException(e); + } catch (SecurityException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InvocationTargetException e) { + throw new SerDeException(e); + } catch (NoSuchMethodException e) { + throw new SerDeException(e); + } + + return (Map) types; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java index cedef10..2727b36 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java @@ -149,8 +149,16 @@ public Object getMapValueElement(Object key) { } if (keyI.equals(key)) { // Got a match, return the value - LazyObject v = (LazyObject) entry.getValue(); - return v == null ? v : v.getObject(); + Object _value = entry.getValue(); + + // If the given value is a type of LazyObject, then only try and convert it to that form. + // Else return it as it is. + if (_value instanceof LazyObject) { + LazyObject v = (LazyObject) entry.getValue(); + return v == null ? v : v.getObject(); + } else { + return _value; + } } } diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestAvroSchemaRetriever.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestAvroSchemaRetriever.java new file mode 100644 index 0000000..ce31d6f --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestAvroSchemaRetriever.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.hbase; + +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.avro.reflect.ReflectData; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde2.avro.AvroSchemaRetriever; + +/** + * Mock implementation + * */ +public class HBaseTestAvroSchemaRetriever extends AvroSchemaRetriever { + + private static final byte[] TEST_BYTE_ARRAY = Bytes.toBytes("test"); + + public HBaseTestAvroSchemaRetriever(Configuration conf, Properties tbl) { + } + + @Override + public Schema retrieveWriterSchema(Object source) { + Class clazz; + try { + clazz = Class.forName("org.apache.hadoop.hive.hbase.avro.Employee"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + return ReflectData.get().getSchema(clazz); + } + + @Override + public Schema retrieveReaderSchema(Object source) { + Class clazz; + try { + clazz = Class.forName("org.apache.hadoop.hive.hbase.avro.Employee"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + return ReflectData.get().getSchema(clazz); + } + + @Override + public int getOffset() { + return TEST_BYTE_ARRAY.length; + } +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java index 13c344b..c30cc90 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.hbase; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -32,7 +34,8 @@ Properties tbl; Configuration conf; - public HBaseTestCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, Configuration conf) { + public HBaseTestCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, + Configuration conf) { super(oi); this.tbl = tbl; this.conf = conf; @@ -54,4 +57,15 @@ public Object getField(int fieldID) { return toLazyObject(fieldID, fieldBytes); } + + @Override + public Map getParts() { + Map partsMap = new HashMap(); + + partsMap.put("A", "string"); + partsMap.put("B", "string"); + partsMap.put("C", "string"); + + return partsMap; + } } diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index 089a31a..3577c4d 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -22,23 +22,40 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import junit.framework.TestCase; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.hbase.avro.Employee; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BooleanWritable; @@ -53,25 +70,85 @@ */ public class TestHBaseSerDe extends TestCase { + private static final byte[] TEST_BYTE_ARRAY = Bytes.toBytes("test"); + + private static final String RECORD_SCHEMA = "{\n" + + " \"namespace\": \"testing.test.mctesty\",\n" + + " \"name\": \"oneRecord\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"aRecord\",\n" + + " \"type\":{\"type\":\"record\",\n" + + " \"name\":\"recordWithinARecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"int1\",\n" + + " \"type\":\"int\"\n" + + " },\n" + + " {\n" + + " \"name\":\"boolean1\",\n" + + " \"type\":\"boolean\"\n" + + " },\n" + + " {\n" + + " \"name\":\"long1\",\n" + + " \"type\":\"long\"\n" + + " }\n" + + " ]}\n" + + " }\n" + + " ]\n" + + "}"; + + private static final String RECORD_SCHEMA_EVOLVED = "{\n" + + " \"namespace\": \"testing.test.mctesty\",\n" + + " \"name\": \"oneRecord\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"aRecord\",\n" + + " \"type\":{\"type\":\"record\",\n" + + " \"name\":\"recordWithinARecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"int1\",\n" + + " \"type\":\"int\"\n" + + " },\n" + + " {\n" + + " \"name\":\"string1\",\n" + + " \"type\":\"string\", \"default\": \"test\"\n" + + " },\n" + + " {\n" + + " \"name\":\"boolean1\",\n" + + " \"type\":\"boolean\"\n" + + " },\n" + + " {\n" + + " \"name\":\"long1\",\n" + + " \"type\":\"long\"\n" + + " }\n" + + " ]}\n" + + " }\n" + + " ]\n" + + "}"; + /** * Test the default behavior of the Lazy family of objects and object inspectors. */ public void testHBaseSerDeI() throws SerDeException { - byte [] cfa = "cola".getBytes(); - byte [] cfb = "colb".getBytes(); - byte [] cfc = "colc".getBytes(); - - byte [] qualByte = "byte".getBytes(); - byte [] qualShort = "short".getBytes(); - byte [] qualInt = "int".getBytes(); - byte [] qualLong = "long".getBytes(); - byte [] qualFloat = "float".getBytes(); - byte [] qualDouble = "double".getBytes(); - byte [] qualString = "string".getBytes(); - byte [] qualBool = "boolean".getBytes(); + byte[] cfa = "cola".getBytes(); + byte[] cfb = "colb".getBytes(); + byte[] cfc = "colc".getBytes(); + + byte[] qualByte = "byte".getBytes(); + byte[] qualShort = "short".getBytes(); + byte[] qualInt = "int".getBytes(); + byte[] qualLong = "long".getBytes(); + byte[] qualFloat = "float".getBytes(); + byte[] qualDouble = "double".getBytes(); + byte[] qualString = "string".getBytes(); + byte[] qualBool = "boolean".getBytes(); - byte [] rowKey = Bytes.toBytes("test-row1"); + byte[] rowKey = Bytes.toBytes("test-row1"); // Data List kvs = new ArrayList(); @@ -100,15 +177,15 @@ public void testHBaseSerDeI() throws SerDeException { p.add(cfb, qualBool, Bytes.toBytes("true")); Object[] expectedFieldsData = { - new Text("test-row1"), - new ByteWritable((byte)123), - new ShortWritable((short)456), - new IntWritable(789), - new LongWritable(1000), - new FloatWritable(-0.01F), - new DoubleWritable(5.3), - new Text("Hadoop, HBase, and Hive"), - new BooleanWritable(true) + new Text("test-row1"), + new ByteWritable((byte) 123), + new ShortWritable((short) 456), + new IntWritable(789), + new LongWritable(1000), + new FloatWritable(-0.01F), + new DoubleWritable(5.3), + new Text("Hadoop, HBase, and Hive"), + new BooleanWritable(true) }; // Create, initialize, and test the SerDe @@ -148,24 +225,24 @@ public void testHBaseSerDeWithTimestamp() throws SerDeException { Properties tbl = createPropertiesI_I(); long putTimestamp = 1; tbl.setProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP, - Long.toString(putTimestamp)); + Long.toString(putTimestamp)); serDe.initialize(conf, tbl); - byte [] cfa = "cola".getBytes(); - byte [] cfb = "colb".getBytes(); - byte [] cfc = "colc".getBytes(); - - byte [] qualByte = "byte".getBytes(); - byte [] qualShort = "short".getBytes(); - byte [] qualInt = "int".getBytes(); - byte [] qualLong = "long".getBytes(); - byte [] qualFloat = "float".getBytes(); - byte [] qualDouble = "double".getBytes(); - byte [] qualString = "string".getBytes(); - byte [] qualBool = "boolean".getBytes(); + byte[] cfa = "cola".getBytes(); + byte[] cfb = "colb".getBytes(); + byte[] cfc = "colc".getBytes(); + + byte[] qualByte = "byte".getBytes(); + byte[] qualShort = "short".getBytes(); + byte[] qualInt = "int".getBytes(); + byte[] qualLong = "long".getBytes(); + byte[] qualFloat = "float".getBytes(); + byte[] qualDouble = "double".getBytes(); + byte[] qualString = "string".getBytes(); + byte[] qualBool = "boolean".getBytes(); - byte [] rowKey = Bytes.toBytes("test-row1"); + byte[] rowKey = Bytes.toBytes("test-row1"); // Data List kvs = new ArrayList(); @@ -182,7 +259,7 @@ public void testHBaseSerDeWithTimestamp() throws SerDeException { Result r = new Result(kvs); - Put p = new Put(rowKey,putTimestamp); + Put p = new Put(rowKey, putTimestamp); p.add(cfa, qualByte, Bytes.toBytes("123")); p.add(cfb, qualShort, Bytes.toBytes("456")); @@ -194,15 +271,15 @@ public void testHBaseSerDeWithTimestamp() throws SerDeException { p.add(cfb, qualBool, Bytes.toBytes("true")); Object[] expectedFieldsData = { - new Text("test-row1"), - new ByteWritable((byte)123), - new ShortWritable((short)456), - new IntWritable(789), - new LongWritable(1000), - new FloatWritable(-0.01F), - new DoubleWritable(5.3), - new Text("Hadoop, HBase, and Hive"), - new BooleanWritable(true) + new Text("test-row1"), + new ByteWritable((byte) 123), + new ShortWritable((short) 456), + new IntWritable(789), + new LongWritable(1000), + new FloatWritable(-0.01F), + new DoubleWritable(5.3), + new Text("Hadoop, HBase, and Hive"), + new BooleanWritable(true) }; deserializeAndSerialize(serDe, r, p, expectedFieldsData); @@ -214,7 +291,7 @@ private void deserializeAndSerialize( // Get the row structure StructObjectInspector oi = (StructObjectInspector) - serDe.getObjectInspector(); + serDe.getObjectInspector(); List fieldRefs = oi.getAllStructFieldRefs(); assertEquals(9, fieldRefs.size()); @@ -223,14 +300,14 @@ private void deserializeAndSerialize( for (int i = 0; i < fieldRefs.size(); i++) { Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); if (fieldData != null) { - fieldData = ((LazyPrimitive)fieldData).getWritableObject(); + fieldData = ((LazyPrimitive) fieldData).getWritableObject(); } assertEquals("Field " + i, expectedFieldsData[i], fieldData); } // Serialize assertEquals(PutWritable.class, serDe.getSerializedClass()); PutWritable serializedPut = (PutWritable) serDe.serialize(row, oi); - assertEquals("Serialized data", p.toString(),String.valueOf(serializedPut.getPut())); + assertEquals("Serialized data", p.toString(), String.valueOf(serializedPut.getPut())); } // No specifications default to UTF8 String storage for backwards compatibility @@ -259,7 +336,7 @@ private Properties createPropertiesI_II() { "string,tinyint:smallint:int:bigint:float:double:string:boolean"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#-,cola:byte#s,colb:short#-,colc:int#s,cola:long#s,colb:float#-,colc:double#-," + - "cola:string#s,colb:boolean#s"); + "cola:string#s,colb:boolean#s"); return tbl; } @@ -274,7 +351,7 @@ private Properties createPropertiesI_III() { "string,tinyint:smallint:int:bigint:float:double:string:boolean"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#s,cola:byte#s,colb:short#s,colc:int#s,cola:long#s,colb:float#s,colc:double#s," + - "cola:string#s,colb:boolean#s"); + "cola:string#s,colb:boolean#s"); tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary"); return tbl; } @@ -291,39 +368,39 @@ private Properties createPropertiesI_IV() { "string,tinyint:smallint:int:bigint:float:double:string:boolean"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#-,cola:byte#s,colb:short#s,colc:int#s,cola:long#s,colb:float#s,colc:double#s," + - "cola:string#b,colb:boolean#s"); + "cola:string#b,colb:boolean#s"); tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary"); return tbl; } public void testHBaseSerDeII() throws SerDeException { - byte [] cfa = "cfa".getBytes(); - byte [] cfb = "cfb".getBytes(); - byte [] cfc = "cfc".getBytes(); + byte[] cfa = "cfa".getBytes(); + byte[] cfb = "cfb".getBytes(); + byte[] cfc = "cfc".getBytes(); - byte [] qualByte = "byte".getBytes(); - byte [] qualShort = "short".getBytes(); - byte [] qualInt = "int".getBytes(); - byte [] qualLong = "long".getBytes(); - byte [] qualFloat = "float".getBytes(); - byte [] qualDouble = "double".getBytes(); - byte [] qualString = "string".getBytes(); - byte [] qualBool = "boolean".getBytes(); + byte[] qualByte = "byte".getBytes(); + byte[] qualShort = "short".getBytes(); + byte[] qualInt = "int".getBytes(); + byte[] qualLong = "long".getBytes(); + byte[] qualFloat = "float".getBytes(); + byte[] qualDouble = "double".getBytes(); + byte[] qualString = "string".getBytes(); + byte[] qualBool = "boolean".getBytes(); - byte [] rowKey = Bytes.toBytes("test-row-2"); + byte[] rowKey = Bytes.toBytes("test-row-2"); // Data List kvs = new ArrayList(); - kvs.add(new KeyValue(rowKey, cfa, qualByte, new byte [] { Byte.MIN_VALUE })); + kvs.add(new KeyValue(rowKey, cfa, qualByte, new byte[] {Byte.MIN_VALUE})); kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE))); kvs.add(new KeyValue(rowKey, cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE))); kvs.add(new KeyValue(rowKey, cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE))); kvs.add(new KeyValue(rowKey, cfb, qualFloat, Bytes.toBytes(Float.MIN_VALUE))); kvs.add(new KeyValue(rowKey, cfc, qualDouble, Bytes.toBytes(Double.MAX_VALUE))); kvs.add(new KeyValue(rowKey, cfa, qualString, Bytes.toBytes( - "Hadoop, HBase, and Hive Again!"))); + "Hadoop, HBase, and Hive Again!"))); kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes(false))); Collections.sort(kvs, KeyValue.COMPARATOR); @@ -331,7 +408,7 @@ public void testHBaseSerDeII() throws SerDeException { Put p = new Put(rowKey); - p.add(cfa, qualByte, new byte [] { Byte.MIN_VALUE }); + p.add(cfa, qualByte, new byte[] {Byte.MIN_VALUE}); p.add(cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE)); p.add(cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE)); p.add(cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE)); @@ -341,15 +418,15 @@ public void testHBaseSerDeII() throws SerDeException { p.add(cfb, qualBool, Bytes.toBytes(false)); Object[] expectedFieldsData = { - new Text("test-row-2"), - new ByteWritable(Byte.MIN_VALUE), - new ShortWritable(Short.MIN_VALUE), - new IntWritable(Integer.MIN_VALUE), - new LongWritable(Long.MIN_VALUE), - new FloatWritable(Float.MIN_VALUE), - new DoubleWritable(Double.MAX_VALUE), - new Text("Hadoop, HBase, and Hive Again!"), - new BooleanWritable(false) + new Text("test-row-2"), + new ByteWritable(Byte.MIN_VALUE), + new ShortWritable(Short.MIN_VALUE), + new IntWritable(Integer.MIN_VALUE), + new LongWritable(Long.MIN_VALUE), + new FloatWritable(Float.MIN_VALUE), + new DoubleWritable(Double.MAX_VALUE), + new Text("Hadoop, HBase, and Hive Again!"), + new BooleanWritable(false) }; // Create, initialize, and test the SerDe @@ -385,7 +462,7 @@ private Properties createPropertiesII_I() { "string,tinyint:smallint:int:bigint:float:double:string:boolean"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#-,cfa:byte#b,cfb:short#b,cfc:int#-,cfa:long#b,cfb:float#-,cfc:double#b," + - "cfa:string#b,cfb:boolean#-"); + "cfa:string#b,cfb:boolean#-"); tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary"); return tbl; } @@ -400,7 +477,7 @@ private Properties createPropertiesII_II() { "string,tinyint:smallint:int:bigint:float:double:string:boolean"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#b,cfa:byte#b,cfb:short#b,cfc:int#b,cfa:long#b,cfb:float#b,cfc:double#b," + - "cfa:string#b,cfb:boolean#b"); + "cfa:string#b,cfb:boolean#b"); tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"); return tbl; } @@ -415,46 +492,46 @@ private Properties createPropertiesII_III() { "string,tinyint:smallint:int:bigint:float:double:string:boolean"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#-,cfa:byte#b,cfb:short#b,cfc:int#b,cfa:long#b,cfb:float#b,cfc:double#b," + - "cfa:string#-,cfb:boolean#b"); + "cfa:string#-,cfb:boolean#b"); return tbl; } public void testHBaseSerDeWithHiveMapToHBaseColumnFamily() throws SerDeException { - byte [] cfint = "cf-int".getBytes(); - byte [] cfbyte = "cf-byte".getBytes(); - byte [] cfshort = "cf-short".getBytes(); - byte [] cflong = "cf-long".getBytes(); - byte [] cffloat = "cf-float".getBytes(); - byte [] cfdouble = "cf-double".getBytes(); - byte [] cfbool = "cf-bool".getBytes(); + byte[] cfint = "cf-int".getBytes(); + byte[] cfbyte = "cf-byte".getBytes(); + byte[] cfshort = "cf-short".getBytes(); + byte[] cflong = "cf-long".getBytes(); + byte[] cffloat = "cf-float".getBytes(); + byte[] cfdouble = "cf-double".getBytes(); + byte[] cfbool = "cf-bool".getBytes(); - byte [][] columnFamilies = - new byte [][] {cfint, cfbyte, cfshort, cflong, cffloat, cfdouble, cfbool}; + byte[][] columnFamilies = + new byte[][] {cfint, cfbyte, cfshort, cflong, cffloat, cfdouble, cfbool}; - byte [][] rowKeys = new byte [][] { + byte[][] rowKeys = new byte[][] { Integer.toString(1).getBytes(), Integer.toString(Integer.MIN_VALUE).getBytes(), Integer.toString(Integer.MAX_VALUE).getBytes() }; - byte [][][] columnQualifiersAndValues = new byte [][][] { - {Bytes.toBytes(1), new byte [] {1}, Bytes.toBytes((short) 1), - Bytes.toBytes((long) 1), Bytes.toBytes((float) 1.0F), Bytes.toBytes(1.0), - Bytes.toBytes(true)}, - {Bytes.toBytes(Integer.MIN_VALUE), new byte [] {Byte.MIN_VALUE}, - Bytes.toBytes((short) Short.MIN_VALUE), Bytes.toBytes((long) Long.MIN_VALUE), - Bytes.toBytes((float) Float.MIN_VALUE), Bytes.toBytes(Double.MIN_VALUE), - Bytes.toBytes(false)}, - {Bytes.toBytes(Integer.MAX_VALUE), new byte [] {Byte.MAX_VALUE}, - Bytes.toBytes((short) Short.MAX_VALUE), Bytes.toBytes((long) Long.MAX_VALUE), - Bytes.toBytes((float) Float.MAX_VALUE), Bytes.toBytes(Double.MAX_VALUE), - Bytes.toBytes(true)} + byte[][][] columnQualifiersAndValues = new byte[][][] { + {Bytes.toBytes(1), new byte[] {1}, Bytes.toBytes((short) 1), + Bytes.toBytes((long) 1), Bytes.toBytes((float) 1.0F), Bytes.toBytes(1.0), + Bytes.toBytes(true)}, + {Bytes.toBytes(Integer.MIN_VALUE), new byte[] {Byte.MIN_VALUE}, + Bytes.toBytes((short) Short.MIN_VALUE), Bytes.toBytes((long) Long.MIN_VALUE), + Bytes.toBytes((float) Float.MIN_VALUE), Bytes.toBytes(Double.MIN_VALUE), + Bytes.toBytes(false)}, + {Bytes.toBytes(Integer.MAX_VALUE), new byte[] {Byte.MAX_VALUE}, + Bytes.toBytes((short) Short.MAX_VALUE), Bytes.toBytes((long) Long.MAX_VALUE), + Bytes.toBytes((float) Float.MAX_VALUE), Bytes.toBytes(Double.MAX_VALUE), + Bytes.toBytes(true)} }; List kvs = new ArrayList(); - Result [] r = new Result [] {null, null, null}; - Put [] p = new Put [] {null, null, null}; + Result[] r = new Result[] {null, null, null}; + Put[] p = new Put[] {null, null, null}; for (int i = 0; i < r.length; i++) { kvs.clear(); @@ -470,18 +547,18 @@ public void testHBaseSerDeWithHiveMapToHBaseColumnFamily() throws SerDeException r[i] = new Result(kvs); } - Object [][] expectedData = { + Object[][] expectedData = { {new Text(Integer.toString(1)), new IntWritable(1), new ByteWritable((byte) 1), - new ShortWritable((short) 1), new LongWritable(1), new FloatWritable(1.0F), - new DoubleWritable(1.0), new BooleanWritable(true)}, + new ShortWritable((short) 1), new LongWritable(1), new FloatWritable(1.0F), + new DoubleWritable(1.0), new BooleanWritable(true)}, {new Text(Integer.toString(Integer.MIN_VALUE)), new IntWritable(Integer.MIN_VALUE), - new ByteWritable(Byte.MIN_VALUE), new ShortWritable(Short.MIN_VALUE), - new LongWritable(Long.MIN_VALUE), new FloatWritable(Float.MIN_VALUE), - new DoubleWritable(Double.MIN_VALUE), new BooleanWritable(false)}, + new ByteWritable(Byte.MIN_VALUE), new ShortWritable(Short.MIN_VALUE), + new LongWritable(Long.MIN_VALUE), new FloatWritable(Float.MIN_VALUE), + new DoubleWritable(Double.MIN_VALUE), new BooleanWritable(false)}, {new Text(Integer.toString(Integer.MAX_VALUE)), new IntWritable(Integer.MAX_VALUE), - new ByteWritable(Byte.MAX_VALUE), new ShortWritable(Short.MAX_VALUE), - new LongWritable(Long.MAX_VALUE), new FloatWritable(Float.MAX_VALUE), - new DoubleWritable(Double.MAX_VALUE), new BooleanWritable(true)}}; + new ByteWritable(Byte.MAX_VALUE), new ShortWritable(Short.MAX_VALUE), + new LongWritable(Long.MAX_VALUE), new FloatWritable(Float.MAX_VALUE), + new DoubleWritable(Double.MAX_VALUE), new BooleanWritable(true)}}; HBaseSerDe hbaseSerDe = new HBaseSerDe(); Configuration conf = new Configuration(); @@ -502,12 +579,12 @@ public void testHBaseSerDeWithHiveMapToHBaseColumnFamily() throws SerDeException private void deserializeAndSerializeHiveMapHBaseColumnFamily( HBaseSerDe hbaseSerDe, - Result [] r, - Put [] p, - Object [][] expectedData, - byte [][] rowKeys, - byte [][] columnFamilies, - byte [][][] columnQualifiersAndValues) throws SerDeException { + Result[] r, + Put[] p, + Object[][] expectedData, + byte[][] rowKeys, + byte[][] columnFamilies, + byte[][][] columnQualifiersAndValues) throws SerDeException { StructObjectInspector soi = (StructObjectInspector) hbaseSerDe.getObjectInspector(); List fieldRefs = soi.getAllStructFieldRefs(); @@ -517,7 +594,7 @@ private void deserializeAndSerializeHiveMapHBaseColumnFamily( for (int i = 0; i < r.length; i++) { Object row = hbaseSerDe.deserialize(new ResultWritable(r[i])); Put serializedPut = ((PutWritable) hbaseSerDe.serialize(row, soi)).getPut(); - byte [] rowKey = serializedPut.getRow(); + byte[] rowKey = serializedPut.getRow(); for (int k = 0; k < rowKey.length; k++) { assertEquals(rowKey[k], rowKeys[i][k]); @@ -551,10 +628,10 @@ private Properties createPropertiesForHiveMapHBaseColumnFamily() { "key,valint,valbyte,valshort,vallong,valfloat,valdouble,valbool"); tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string:map:map:map:map:" - + "map:map:map"); + + "map:map:map"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#-,cf-int:#b:b,cf-byte:#b:b,cf-short:#b:b,cf-long:#b:b,cf-float:#b:b,cf-double:#b:b," + - "cf-bool:#b:b"); + "cf-bool:#b:b"); return tbl; } @@ -565,31 +642,31 @@ private Properties createPropertiesForHiveMapHBaseColumnFamilyII() { "key,valint,valbyte,valshort,vallong,valfloat,valdouble,valbool"); tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string:map:map:map:map:" - + "map:map:map"); + + "map:map:map"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#-,cf-int:#-:-,cf-byte:#-:-,cf-short:#-:-,cf-long:#-:-,cf-float:#-:-,cf-double:#-:-," + - "cf-bool:#-:-"); + "cf-bool:#-:-"); tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary"); return tbl; } public void testHBaseSerDeWithHiveMapToHBaseColumnFamilyII() throws SerDeException { - byte [] cfbyte = "cf-byte".getBytes(); - byte [] cfshort = "cf-short".getBytes(); - byte [] cfint = "cf-int".getBytes(); - byte [] cflong = "cf-long".getBytes(); - byte [] cffloat = "cf-float".getBytes(); - byte [] cfdouble = "cf-double".getBytes(); - byte [] cfstring = "cf-string".getBytes(); - byte [] cfbool = "cf-bool".getBytes(); + byte[] cfbyte = "cf-byte".getBytes(); + byte[] cfshort = "cf-short".getBytes(); + byte[] cfint = "cf-int".getBytes(); + byte[] cflong = "cf-long".getBytes(); + byte[] cffloat = "cf-float".getBytes(); + byte[] cfdouble = "cf-double".getBytes(); + byte[] cfstring = "cf-string".getBytes(); + byte[] cfbool = "cf-bool".getBytes(); - byte [][] columnFamilies = - new byte [][] {cfbyte, cfshort, cfint, cflong, cffloat, cfdouble, cfstring, cfbool}; + byte[][] columnFamilies = + new byte[][] {cfbyte, cfshort, cfint, cflong, cffloat, cfdouble, cfstring, cfbool}; - byte [] rowKey = Bytes.toBytes("row-key"); + byte[] rowKey = Bytes.toBytes("row-key"); - byte [][] columnQualifiersAndValues = new byte [][] { + byte[][] columnQualifiersAndValues = new byte[][] { Bytes.toBytes("123"), Bytes.toBytes("456"), Bytes.toBytes("789"), Bytes.toBytes("1000"), Bytes.toBytes("-0.01"), Bytes.toBytes("5.3"), Bytes.toBytes("Hive"), Bytes.toBytes("true") @@ -606,7 +683,7 @@ public void testHBaseSerDeWithHiveMapToHBaseColumnFamilyII() throws SerDeExcepti Result r = new Result(kvs); - Object [] expectedData = { + Object[] expectedData = { new Text("row-key"), new ByteWritable((byte) 123), new ShortWritable((short) 456), new IntWritable(789), new LongWritable(1000), new FloatWritable(-0.01F), new DoubleWritable(5.3), new Text("Hive"), new BooleanWritable(true) @@ -636,10 +713,10 @@ private Properties createPropertiesForHiveMapHBaseColumnFamilyII_I() { "key,valbyte,valshort,valint,vallong,valfloat,valdouble,valstring,valbool"); tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string:map:map:map:map:" - + "map:map:map:map"); + + "map:map:map:map"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#s,cf-byte:#-:s,cf-short:#s:-,cf-int:#s:s,cf-long:#-:-,cf-float:#s:-,cf-double:#-:s," + - "cf-string:#s:s,cf-bool:#-:-"); + "cf-string:#s:s,cf-bool:#-:-"); return tbl; } @@ -650,10 +727,10 @@ private Properties createPropertiesForHiveMapHBaseColumnFamilyII_II() { "key,valbyte,valshort,valint,vallong,valfloat,valdouble,valstring,valbool"); tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string:map:map:map:map:" - + "map:map:map:map"); + + "map:map:map:map"); tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key#s,cf-byte:#s:s,cf-short:#s:s,cf-int:#s:s,cf-long:#s:s,cf-float:#s:s,cf-double:#s:s," + - "cf-string:#s:s,cf-bool:#s:s"); + "cf-string:#s:s,cf-bool:#s:s"); tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary"); return tbl; } @@ -662,9 +739,9 @@ private void deserializeAndSerializeHiveMapHBaseColumnFamilyII( HBaseSerDe hbaseSerDe, Result r, Put p, - Object [] expectedData, - byte [][] columnFamilies, - byte [][] columnQualifiersAndValues) throws SerDeException { + Object[] expectedData, + byte[][] columnFamilies, + byte[][] columnQualifiersAndValues) throws SerDeException { StructObjectInspector soi = (StructObjectInspector) hbaseSerDe.getObjectInspector(); List fieldRefs = soi.getAllStructFieldRefs(); @@ -681,7 +758,7 @@ private void deserializeAndSerializeHiveMapHBaseColumnFamilyII( assertEquals(expectedData[j], ((LazyPrimitive) fieldData).getWritableObject()); } else if (fieldData instanceof LazyHBaseCellMap) { LazyPrimitive lazyPrimitive = (LazyPrimitive) - ((LazyHBaseCellMap) fieldData).getMapValueElement(expectedData[j]); + ((LazyHBaseCellMap) fieldData).getMapValueElement(expectedData[j]); assertEquals(expectedData[j], lazyPrimitive.getWritableObject()); } else { fail("Error: field data not an instance of LazyPrimitive or LazyHBaseCellMap"); @@ -934,6 +1011,592 @@ private void deserializeAndSerializeHBaseCompositeKey(HBaseSerDe serDe, Result r assertEquals("Serialized put:", p.toString(), put.toString()); } + public void testHBaseSerDeWithAvroSchemaInline() throws SerDeException, IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualAvro = "avro".getBytes(); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA); + + kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Object[] expectedFieldsData = {new String("test-row1"), new String("[[42, true, 42432234234]]")}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForHiveAvroSchemaInline(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + } + + private Properties createPropertiesForHiveAvroSchemaInline() { + Properties tbl = new Properties(); + tbl.setProperty("cola.avro.serialization.type", "avro"); + tbl.setProperty("cola.avro." + AvroSerdeUtils.SCHEMA_LITERAL, RECORD_SCHEMA); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:avro"); + tbl.setProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT, "true"); + + return tbl; + } + + public void testHBaseSerDeWithForwardEvolvedSchema() throws SerDeException, IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualAvro = "avro".getBytes(); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA); + + kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Object[] expectedFieldsData = {new String("test-row1"), + new String("[[42, test, true, 42432234234]]")}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForHiveAvroForwardEvolvedSchema(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + } + + private Properties createPropertiesForHiveAvroForwardEvolvedSchema() { + Properties tbl = new Properties(); + tbl.setProperty("cola.avro.serialization.type", "avro"); + tbl.setProperty("cola.avro." + AvroSerdeUtils.SCHEMA_LITERAL, RECORD_SCHEMA_EVOLVED); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:avro"); + tbl.setProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT, "true"); + + return tbl; + } + + public void testHBaseSerDeWithBackwardEvolvedSchema() throws SerDeException, IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualAvro = "avro".getBytes(); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA_EVOLVED); + + kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Object[] expectedFieldsData = {new String("test-row1"), new String("[[42, true, 42432234234]]")}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForHiveAvroBackwardEvolvedSchema(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + } + + private Properties createPropertiesForHiveAvroBackwardEvolvedSchema() { + Properties tbl = new Properties(); + tbl.setProperty("cola.avro.serialization.type", "avro"); + tbl.setProperty("cola.avro." + AvroSerdeUtils.SCHEMA_LITERAL, RECORD_SCHEMA); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:avro"); + tbl.setProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT, "true"); + + return tbl; + } + + public void testHBaseSerDeWithAvroSerClass() throws SerDeException, IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualProtobuf = "avro".getBytes(); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] avroData = getTestAvroBytesFromClass1(1); + + kvs.add(new KeyValue(rowKey, cfa, qualProtobuf, avroData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualProtobuf, avroData)); + + Object[] expectedFieldsData = { + new String("test-row1"), + new String( + "[Avro Employee1, 11111, 25, FEMALE, [[[Avro First Address1, Avro Second Address1, Avro City1, 123456, 0:[999, 1234567890], null, {testkey=testvalue}], " + + "[Avro First Address1, Avro Second Address1, Avro City1, 123456, 0:[999, 1234567890], null, {testkey=testvalue}]], " + + "[999, 1234567890], [999, 1234455555]]]")}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForHiveAvroSerClass(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + } + + private Properties createPropertiesForHiveAvroSerClass() { + Properties tbl = new Properties(); + tbl.setProperty("cola.avro.serialization.type", "avro"); + tbl.setProperty("cola.avro." + serdeConstants.SERIALIZATION_CLASS, + "org.apache.hadoop.hive.hbase.avro.Employee"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:avro"); + tbl.setProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT, "true"); + + return tbl; + } + + public void testHBaseSerDeWithAvroSchemaUrl() throws SerDeException, IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualAvro = "avro".getBytes(); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA); + + kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Object[] expectedFieldsData = {new String("test-row1"), new String("[[42, true, 42432234234]]")}; + + MiniDFSCluster miniDfs = null; + + try { + // MiniDFSCluster litters files and folders all over the place. + miniDfs = new MiniDFSCluster(new Configuration(), 1, true, null); + + miniDfs.getFileSystem().mkdirs(new Path("/path/to/schema")); + FSDataOutputStream out = miniDfs.getFileSystem().create( + new Path("/path/to/schema/schema.avsc")); + out.writeBytes(RECORD_SCHEMA); + out.close(); + String onHDFS = miniDfs.getFileSystem().getUri() + "/path/to/schema/schema.avsc"; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForHiveAvroSchemaUrl(onHDFS); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + } finally { + // Teardown the cluster + if (miniDfs != null) { + miniDfs.shutdown(); + } + } + } + + private Properties createPropertiesForHiveAvroSchemaUrl(String schemaUrl) { + Properties tbl = new Properties(); + tbl.setProperty("cola.avro.serialization.type", "avro"); + tbl.setProperty("cola.avro." + AvroSerdeUtils.SCHEMA_URL, schemaUrl); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:avro"); + tbl.setProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT, "true"); + + return tbl; + } + + public void testHBaseSerDeWithAvroExternalSchema() throws SerDeException, IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualAvro = "avro".getBytes(); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] avroData = getTestAvroBytesFromClass2(1); + + kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); + + Object[] expectedFieldsData = { + new String("test-row1"), + new String( + "[Avro Employee1, 11111, 25, FEMALE, [[[Avro First Address1, Avro Second Address1, Avro City1, 123456, 0:[999, 1234567890], null, {testkey=testvalue}], [Avro First Address1, Avro Second Address1, Avro City1, 123456, 0:[999, 1234567890], null, {testkey=testvalue}]], " + + "[999, 1234567890], [999, 1234455555]]]")}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + + Properties tbl = createPropertiesForHiveAvroExternalSchema(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + } + + private Properties createPropertiesForHiveAvroExternalSchema() { + Properties tbl = new Properties(); + tbl.setProperty("cola.avro.serialization.type", "avro"); + tbl.setProperty(AvroSerdeUtils.SCHEMA_RETRIEVER, + "org.apache.hadoop.hive.hbase.HBaseTestAvroSchemaRetriever"); + tbl.setProperty("cola.avro." + serdeConstants.SERIALIZATION_CLASS, + "org.apache.hadoop.hive.hbase.avro.Employee"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:avro"); + tbl.setProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT, "true"); + + return tbl; + } + + public void testHBaseSerDeWithHiveMapToHBaseAvroColumnFamily() throws Exception { + byte[] cfa = "cola".getBytes(); + + byte[] qualAvroA = "prefixA_avro1".getBytes(); + byte[] qualAvroB = "prefixB_avro2".getBytes(); + byte[] qualAvroC = "prefixB_avro3".getBytes(); + + List qualifiers = new ArrayList(); + qualifiers.add(new Text("prefixA_avro1")); + qualifiers.add(new Text("prefixB_avro2")); + qualifiers.add(new Text("prefixB_avro3")); + + List expectedQualifiers = new ArrayList(); + expectedQualifiers.add(new Text("prefixB_avro2")); + expectedQualifiers.add(new Text("prefixB_avro3")); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] avroDataA = getTestAvroBytesFromSchema(RECORD_SCHEMA); + byte[] avroDataB = getTestAvroBytesFromClass1(1); + byte[] avroDataC = getTestAvroBytesFromClass1(2); + + kvs.add(new KeyValue(rowKey, cfa, qualAvroA, avroDataA)); + kvs.add(new KeyValue(rowKey, cfa, qualAvroB, avroDataB)); + kvs.add(new KeyValue(rowKey, cfa, qualAvroC, avroDataC)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualAvroB, Bytes.padTail(avroDataB, 11))); + p.add(new KeyValue(rowKey, cfa, qualAvroC, Bytes.padTail(avroDataC, 11))); + + Object[] expectedFieldsData = { + new Text("test-row1"), + new String( + "[Avro Employee1, 11111, 25, FEMALE, [[[Avro First Address1, Avro Second Address1, Avro City1, 123456, 0:[999, 1234567890], null, {testkey=testvalue}], [Avro First Address1, Avro Second Address1, Avro City1, 123456, 0:[999, 1234567890], null, {testkey=testvalue}]], " + + "[999, 1234567890], [999, 1234455555]]]"), + new String( + "[Avro Employee2, 11111, 25, FEMALE, [[[Avro First Address2, Avro Second Address2, Avro City2, 123456, 0:[999, 1234567890], null, {testkey=testvalue}], [Avro First Address2, Avro Second Address2, Avro City2, 123456, 0:[999, 1234567890], null, {testkey=testvalue}]], " + + "[999, 1234567890], [999, 1234455555]]]")}; + + int[] expectedMapSize = new int[] {2}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForHiveAvroColumnFamilyMap(); + serDe.initialize(conf, tbl); + + Object notPresentKey = new Text("prefixA_avro1"); + + deserializeAndSerializeHiveStructColumnFamily(serDe, r, p, expectedFieldsData, expectedMapSize, + expectedQualifiers, + notPresentKey); + } + + private Properties createPropertiesForHiveAvroColumnFamilyMap() { + Properties tbl = new Properties(); + tbl.setProperty("cola.prefixB_.serialization.type", "avro"); + tbl.setProperty("cola.prefixB_." + serdeConstants.SERIALIZATION_CLASS, + "org.apache.hadoop.hive.hbase.avro.Employee"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, "cola:prefixB_.*"); + tbl.setProperty(HBaseSerDe.HBASE_AUTOGENERATE_STRUCT, "true"); + tbl.setProperty(LazySimpleSerDe.SERIALIZATION_EXTEND_NESTING_LEVELS, "true"); + + return tbl; + } + + private void deserializeAndSerializeHiveAvro(HBaseSerDe serDe, Result r, Put p, + Object[] expectedFieldsData) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(new ResultWritable(r)); + + for (int j = 0; j < fieldRefs.size(); j++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j)); + assertNotNull(fieldData); + assertEquals(expectedFieldsData[j], fieldData.toString().trim()); + } + + // Now serialize + Put put = ((PutWritable) serDe.serialize(row, soi)).getPut(); + + assertNotNull(put); + assertEquals(p.getFamilyCellMap(), put.getFamilyCellMap()); + } + + private void deserializeAndSerializeHiveStructColumnFamily(HBaseSerDe serDe, Result r, Put p, + Object[] expectedFieldsData, + int[] expectedMapSize, List expectedQualifiers, Object notPresentKey) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(new ResultWritable(r)); + + int k = 0; + + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(i)); + assertNotNull(fieldData); + + if (fieldData instanceof LazyPrimitive) { + assertEquals(expectedFieldsData[i], ((LazyPrimitive) fieldData).getWritableObject()); + } else if (fieldData instanceof LazyHBaseCellMap) { + + for (int j = 0; j < ((LazyHBaseCellMap) fieldData).getMapSize(); j++) { + assertEquals(expectedFieldsData[k + 1], + ((LazyHBaseCellMap) fieldData).getMapValueElement(expectedQualifiers.get(k)) + .toString().trim()); + k++; + } + + assertEquals(expectedMapSize[i - 1], ((LazyHBaseCellMap) fieldData).getMapSize()); + + // Make sure that the unwanted key is not present in the map + assertNull(((LazyHBaseCellMap) fieldData).getMapValueElement(notPresentKey)); + + } else { + fail("Error: field data not an instance of LazyPrimitive or LazyHBaseCellMap"); + } + } + + SerDeUtils.getJSONString(row, soi); + + // Now serialize + Put put = ((PutWritable) serDe.serialize(row, soi)).getPut(); + + assertNotNull(put); + } + + private byte[] getTestAvroBytesFromSchema(String schemaToUse) throws IOException { + Schema s = Schema.parse(schemaToUse); + GenericData.Record record = new GenericData.Record(s); + GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema()); + innerRecord.put("int1", 42); + innerRecord.put("boolean1", true); + innerRecord.put("long1", 42432234234l); + + if (schemaToUse.equals(RECORD_SCHEMA_EVOLVED)) { + innerRecord.put("string1", "new value"); + } + + record.put("aRecord", innerRecord); + + DatumWriter datumWriter = new GenericDatumWriter(s); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(s, out); + dataFileWriter.append(record); + dataFileWriter.close(); + + byte[] data = out.toByteArray(); + + out.close(); + return data; + } + + private byte[] getTestAvroBytesFromClass1(int i) throws IOException { + Employee employee = new Employee(); + + employee.setEmployeeName("Avro Employee" + i); + employee.setEmployeeID(11111L); + employee.setGender(org.apache.hadoop.hive.hbase.avro.Gender.FEMALE); + employee.setAge(25L); + + org.apache.hadoop.hive.hbase.avro.Address address = new org.apache.hadoop.hive.hbase.avro.Address(); + + address.setAddress1("Avro First Address" + i); + address.setAddress2("Avro Second Address" + i); + address.setCity("Avro City" + i); + address.setZipcode(123456L); + + Map metadata = new HashMap(); + + metadata.put("testkey", "testvalue"); + + address.setMetadata(metadata); + + org.apache.hadoop.hive.hbase.avro.HomePhone hPhone = new org.apache.hadoop.hive.hbase.avro.HomePhone(); + + hPhone.setAreaCode(999L); + hPhone.setNumber(1234567890L); + + org.apache.hadoop.hive.hbase.avro.OfficePhone oPhone = new org.apache.hadoop.hive.hbase.avro.OfficePhone(); + + oPhone.setAreaCode(999L); + oPhone.setNumber(1234455555L); + + org.apache.hadoop.hive.hbase.avro.ContactInfo contact = new org.apache.hadoop.hive.hbase.avro.ContactInfo(); + + List addresses = new ArrayList(); + address.setCounty(hPhone); // set value for the union type + addresses.add(address); + addresses.add(address); + + contact.setAddress(addresses); + + contact.setHomePhone(hPhone); + contact.setOfficePhone(oPhone); + + employee.setContactInfo(contact); + + DatumWriter datumWriter = new SpecificDatumWriter(Employee.class); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + dataFileWriter.create(employee.getSchema(), out); + dataFileWriter.append(employee); + dataFileWriter.close(); + + return out.toByteArray(); + } + + private byte[] getTestAvroBytesFromClass2(int i) throws IOException { + Employee employee = new Employee(); + + employee.setEmployeeName("Avro Employee" + i); + employee.setEmployeeID(11111L); + employee.setGender(org.apache.hadoop.hive.hbase.avro.Gender.FEMALE); + employee.setAge(25L); + + org.apache.hadoop.hive.hbase.avro.Address address = new org.apache.hadoop.hive.hbase.avro.Address(); + + address.setAddress1("Avro First Address" + i); + address.setAddress2("Avro Second Address" + i); + address.setCity("Avro City" + i); + address.setZipcode(123456L); + + Map metadata = new HashMap(); + + metadata.put("testkey", "testvalue"); + + address.setMetadata(metadata); + + org.apache.hadoop.hive.hbase.avro.HomePhone hPhone = new org.apache.hadoop.hive.hbase.avro.HomePhone(); + + hPhone.setAreaCode(999L); + hPhone.setNumber(1234567890L); + + org.apache.hadoop.hive.hbase.avro.OfficePhone oPhone = new org.apache.hadoop.hive.hbase.avro.OfficePhone(); + + oPhone.setAreaCode(999L); + oPhone.setNumber(1234455555L); + + org.apache.hadoop.hive.hbase.avro.ContactInfo contact = new org.apache.hadoop.hive.hbase.avro.ContactInfo(); + + List addresses = new ArrayList(); + address.setCounty(hPhone); // set value for the union type + addresses.add(address); + addresses.add(address); + + contact.setAddress(addresses); + + contact.setHomePhone(hPhone); + contact.setOfficePhone(oPhone); + + employee.setContactInfo(contact); + + DatumWriter employeeWriter = new SpecificDatumWriter(Employee.class); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + + // write out a header for the payload + out.write(TEST_BYTE_ARRAY); + + employeeWriter.write(employee, encoder); + + encoder.flush(); + + return out.toByteArray(); + } + class TestStruct { String f1; String f2; diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java index 8beffd7..1461aa6 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.DataOutputStream; @@ -25,6 +26,7 @@ import java.rmi.server.UID; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; @@ -93,6 +95,24 @@ public void readFields(DataInput in) throws IOException { record = gdr.read(record, binaryDecoder); } + public void readFields(byte[] bytes, int offset, int length, Schema writerSchema, Schema readerSchema) throws IOException { + record = new GenericData.Record(writerSchema); + binaryDecoder = DecoderFactory.get().binaryDecoder(bytes, offset, length - offset, + binaryDecoder); + GenericDatumReader gdr = new GenericDatumReader(writerSchema, readerSchema); + record = gdr.read(null, binaryDecoder); + } + + public void readFields(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException{ + record = new GenericData.Record(writerSchema); + GenericDatumReader gdr = new GenericDatumReader(); + gdr.setExpected(readerSchema); + ByteArrayInputStream is = new ByteArrayInputStream(bytes); + DataFileStream dfr = new DataFileStream(is, gdr); + record = dfr.next(record); + dfr.close(); + } + public UID getRecordReaderID() { return recordReaderID; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java new file mode 100644 index 0000000..fd15798 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java @@ -0,0 +1,530 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.avro; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyArray; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.LazyUnion; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyListObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyUnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; + +/** + * Lazy objectinspector for avro serialization + * */ +public class AvroLazyObjectInspector extends LazySimpleStructObjectInspector { + + /** + * Reader {@link Schema} for the avro data + * */ + private Schema readerSchema; + + /** + * {@link AvroSchemaRetriever} to retrieve avro schema + * */ + private AvroSchemaRetriever schemaRetriever; + + /** + * LOGGER + * */ + public static final Log LOG = LogFactory.getLog(AvroLazyObjectInspector.class); + + /** + * Constructor + * + * @param structFieldNames + * fields within the given protobuf object + * @param structFieldObjectInspectors + * object inspectors for the fields + * @param structFieldComments + * comments for the given fields + * @param separator + * separator between different fields + * @param nullSequence + * sequence to represent null value + * @param lastColumnTakesRest + * whether the last column of the struct should take the rest of the + * row if there are extra fields. + * @param escaped + * whether the data is escaped or not + * @param escapeChar + * if escaped is true, the escape character + * */ + public AvroLazyObjectInspector(List structFieldNames, + List structFieldObjectInspectors, List structFieldComments, + byte separator, Text nullSequence, boolean lastColumnTakesRest, boolean escaped, + byte escapeChar) { + super(structFieldNames, structFieldObjectInspectors, structFieldComments, separator, + nullSequence, + lastColumnTakesRest, escaped, escapeChar); + } + + /** + * Set the reader schema for the {@link AvroLazyObjectInspector} to the given schema + * */ + public void setReaderSchema(Schema readerSchema) { + this.readerSchema = readerSchema; + } + + /** + * Set the {@link AvroSchemaRetriever} for the {@link AvroLazyObjectInspector} to the given class + * + * @param scheamRetrieverClass + * the schema retriever class to be set + * */ + public void setSchemaRetriever(AvroSchemaRetriever schemaRetriever) { + this.schemaRetriever = schemaRetriever; + } + + @SuppressWarnings("unchecked") + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + if (data == null) { + return null; + } + + if (!(fieldRef instanceof MyField)) { + throw new IllegalArgumentException("fieldRef has to be of MyField"); + } + + MyField f = (MyField) fieldRef; + int fieldID = f.getFieldID(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting struct field data for field: [" + f.getFieldName() + "] on data [" + + data.getClass() + "]"); + } + + if (data instanceof LazyStruct) { + LazyStruct row = (LazyStruct) data; + + // get the field out of struct + Object rowField = row.getField(fieldID); + + if (rowField instanceof LazyStruct) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Deserializing struct [" + rowField.getClass() + "]"); + } + + return deserializeStruct(rowField, f.getFieldName()); + + } else if (rowField instanceof LazyMap) { + // We have found a map. Systematically deserialize the values of the map and return back the + // map + LazyMap lazyMap = (LazyMap) rowField; + + for (Entry entry : lazyMap.getMap().entrySet()) { + Object _key = entry.getKey(); + Object _value = entry.getValue(); + + if (_value instanceof LazyStruct) { + lazyMap.getMap().put(_key, deserializeStruct(_value, f.getFieldName())); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Returning a lazy map for field [" + f.getFieldName() + "]"); + } + + return lazyMap; + + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Returning [" + rowField.toString() + "] for field [" + f.getFieldName() + "]"); + } + + // Just return the object. We need no further operation on it + return rowField; + } + } else { + + // The Avro deserializer would deserialize our object and return back a list of object that + // hive can operate on. Here we should be getting the same object back. + if (!(data instanceof List)) { + throw new IllegalArgumentException("data should be an instance of list"); + } + + if (!(fieldID < ((List) data).size())) { + return null; + } + + // lookup the field corresponding to the given field ID and return + Object field = ((List) data).get(fieldID); + + if (field == null) { + return null; + } + + // convert to a lazy object and return + return toLazyObject(field, fieldRef.getFieldObjectInspector()); + } + } + + @Override + public List getStructFieldsDataAsList(Object data) { + if (data == null) { + return null; + } + + List result = new ArrayList(fields.size()); + + for (int i = 0; i < fields.size(); i++) { + result.add(getStructFieldData(data, fields.get(i))); + } + + return result; + } + + /** + * Deserialize the given struct object + * + * @param struct + * the object to deserialize + * @param fieldName + * name of the field on which we are currently operating on + * @return a deserialized object can hive can further operate on + * @throws AvroObjectInspectorException + * if something goes wrong during deserialization + * */ + private Object deserializeStruct(Object struct, String fieldName) { + byte[] data = ((LazyStruct) struct).getBytes(); + AvroDeserializer deserializer = new AvroDeserializer(); + + if (data == null) { + return null; + } + + if (readerSchema == null && schemaRetriever == null) { + throw new IllegalArgumentException("reader schema or schemaRetriever must be set for field [" + + fieldName + "]"); + } + + Schema ws = null; + Schema rs = null; + int offset = 0; + + AvroGenericRecordWritable avroWritable = new AvroGenericRecordWritable(); + + if (readerSchema == null) { + + rs = schemaRetriever.retrieveReaderSchema(data); + + if (rs == null) { + // still nothing, Raise exception + throw new IllegalStateException( + "A valid reader schema could not be retrieved either directly or from the schema retriever for field [" + + fieldName + "]"); + } + + ws = schemaRetriever.retrieveWriterSchema(data); + + if (ws == null) { + throw new IllegalStateException( + "Null writer schema retrieved from schemaRetriever for field [" + fieldName + "]"); + } + + // adjust the data bytes according to any possible offset that was provided + offset = schemaRetriever.getOffset(); + + if (data.length < offset) { + throw new IllegalArgumentException("Data size cannot be less than [" + offset + + "]. Found [" + data.length + "]"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Retrieved writer Schema: " + ws.toString()); + LOG.debug("Retrieved reader Schema: " + rs.toString()); + } + + try { + avroWritable.readFields(data, offset, data.length, ws, rs); + } catch (IOException ioe) { + throw new AvroObjectInspectorException("Error deserializing avro payload", ioe); + } + } else { + // a reader schema was provided + if (schemaRetriever != null) { + // a schema retriever has been provided as well. Attempt to read the write schema from the + // retriever + ws = schemaRetriever.retrieveWriterSchema(data); + + if (ws == null) { + throw new IllegalStateException( + "Null writer schema retrieved from schemaRetriever for field [" + fieldName + "]"); + } + } else { + // attempt retrieving the schema from the data + ws = retrieveSchemaFromBytes(data); + } + + rs = readerSchema; + + try { + avroWritable.readFields(data, ws, rs); + } catch (IOException ioe) { + throw new AvroObjectInspectorException("Error deserializing avro payload", ioe); + } + } + + AvroObjectInspectorGenerator oiGenerator = null; + Object deserializedObject = null; + + try { + oiGenerator = new AvroObjectInspectorGenerator(rs); + deserializedObject = deserializer.deserialize(oiGenerator.getColumnNames(), + oiGenerator.getColumnTypes(), + avroWritable, rs); + } catch (SerDeException se) { + throw new AvroObjectInspectorException("Error deserializing avro payload", se); + } + + return deserializedObject; + } + + /** + * Retrieve schema from the given bytes + * + * @return the retrieved {@link Schema schema} + * */ + private Schema retrieveSchemaFromBytes(byte[] data) { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DatumReader reader = new GenericDatumReader(); + + Schema schema = null; + + try { + // dfs is AutoCloseable + @SuppressWarnings("resource") + DataFileStream dfs = new DataFileStream(bais, reader); + schema = dfs.getSchema(); + } catch (IOException ioe) { + throw new AvroObjectInspectorException("An error occurred retrieving schema from bytes", ioe); + } + + return schema; + } + + /** + * Converts the given field to a lazy object + * + * @param field + * to be converted to a lazy object + * @param fieldOI + * {@link ObjectInspector} for the given field + * @return returns the converted lazy object + * */ + private Object toLazyObject(Object field, ObjectInspector fieldOI) { + if (isPrimitive(field.getClass())) { + return toLazyPrimitiveObject(field, fieldOI); + } else if (fieldOI instanceof LazyListObjectInspector) { + return toLazyListObject(field, fieldOI); + } else if (field instanceof StandardUnion) { + return toLazyUnionObject(field, fieldOI); + } else if (fieldOI instanceof LazyMapObjectInspector) { + return toLazyMapObject(field, fieldOI); + } else { + return field; + } + } + + /** + * Convert the given object to a lazy object using the given {@link ObjectInspector} + * + * @param obj + * Object to be converted to a {@link LazyObject} + * @param oi + * ObjectInspector used for the conversion + * @return the created {@link LazyObject lazy object} + * */ + private LazyObject toLazyPrimitiveObject(Object obj, ObjectInspector oi) { + if (obj == null) { + return null; + } + + LazyObject lazyObject = LazyFactory.createLazyObject(oi); + ByteArrayRef ref = new ByteArrayRef(); + + String objAsString = obj.toString().trim(); + + ref.setData(objAsString.getBytes()); + + // initialize the lazy object + lazyObject.init(ref, 0, ref.getData().length); + + return lazyObject; + } + + /** + * Convert the given object to a lazy object using the given {@link ObjectInspector} + * + * @param obj + * Object to be converted to a {@link LazyObject} + * @param oi + * ObjectInspector used for the conversion + * @return the created {@link LazyObject lazy object} + * */ + private Object toLazyListObject(Object obj, ObjectInspector objectInspector) { + if (obj == null) { + return null; + } + + List listObj = (List) obj; + + LazyArray retList = (LazyArray) LazyFactory.createLazyObject(objectInspector); + + List lazyList = retList.getList(); + + ObjectInspector listElementOI = ((ListObjectInspector) objectInspector) + .getListElementObjectInspector(); + + for (int i = 0; i < listObj.size(); i++) { + lazyList.add(toLazyObject(listObj.get(i), listElementOI)); + } + + return retList; + } + + /** + * Convert the given object to a lazy object using the given {@link ObjectInspector} + * + * @param obj + * Object to be converted to a {@link LazyObject} + * @param oi + * ObjectInspector used for the conversion + * @return the created {@link LazyObject lazy object} + * */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private Object toLazyMapObject(Object obj, ObjectInspector objectInspector) { + if (obj == null) { + return null; + } + + // avro guarantees that the key will be of type string. So we just need to worry about + // deserializing the value here + + LazyMap lazyMap = (LazyMap) LazyFactory.createLazyObject(objectInspector); + + Map map = lazyMap.getMap(); + + Map origMap = (Map) obj; + + ObjectInspector keyObjectInspector = ((MapObjectInspector) objectInspector) + .getMapKeyObjectInspector(); + ObjectInspector valueObjectInspector = ((MapObjectInspector) objectInspector) + .getMapValueObjectInspector(); + + for (Entry entry : origMap.entrySet()) { + Object value = entry.getValue(); + + map.put(toLazyPrimitiveObject(entry.getKey(), keyObjectInspector), + toLazyObject(value, valueObjectInspector)); + } + + return lazyMap; + } + + /** + * Convert the given object to a lazy object using the given {@link ObjectInspector} + * + * @param obj + * Object to be converted to a {@link LazyObject} + * @param oi + * ObjectInspector used for the conversion + * @return the created {@link LazyObject lazy object} + * */ + private Object toLazyUnionObject(Object obj, ObjectInspector objectInspector) { + if (obj == null) { + return null; + } + + if (!(objectInspector instanceof LazyUnionObjectInspector)) { + throw new IllegalArgumentException( + "Invalid objectinspector found. Expected LazyUnionObjectInspector, Found " + + objectInspector.getClass()); + } + + StandardUnion standardUnion = (StandardUnion) obj; + + // Grab the tag and the field + byte tag = standardUnion.getTag(); + Object field = standardUnion.getObject(); + + ObjectInspector fieldOI = ((LazyUnionObjectInspector) objectInspector).getObjectInspectors() + .get(tag); + + // convert to lazy object + Object convertedObj = null; + + if (field != null) { + convertedObj = toLazyObject(field, fieldOI); + } + + if (convertedObj == null) { + return null; + } + + LazyUnion lazyUnion = (LazyUnion) LazyFactory.createLazyObject(objectInspector); + + lazyUnion.setField(convertedObj); + lazyUnion.setTag(tag); + + return lazyUnion; + } + + /** + * Determines if the given object is a primitive or a wrapper to a primitive. Note, even though a + * String may not be a primitive in the traditional sense, but it is considered one + * here as it is not a struct. + * + * @param clazz + * input class + * @return true, if the object is a primitive or a wrapper to a primitive, false otherwise. + * */ + private boolean isPrimitive(Class clazz) { + return clazz.isPrimitive() || ClassUtils.wrapperToPrimitive(clazz) != null + || clazz.getSimpleName().equals("String"); + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorException.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorException.java new file mode 100644 index 0000000..022bddd --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.avro; + +/** + * Exception for the {@link AvroLazyObjectInspector} + * */ +public class AvroObjectInspectorException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public AvroObjectInspectorException() { + super(); + } + + public AvroObjectInspectorException(String message) { + super(message); + } + + public AvroObjectInspectorException(Throwable cause) { + super(cause); + } + + public AvroObjectInspectorException(String message, Throwable cause) { + super(message, cause); + } + +} \ No newline at end of file diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java index 46cdb4f..64a6543 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java @@ -40,7 +40,7 @@ * * A list of those fields equivalent types in Hive * * An ObjectInspector capable of working with an instance of that datum. */ -class AvroObjectInspectorGenerator { +public class AvroObjectInspectorGenerator { final private List columnNames; final private List columnTypes; final private ObjectInspector oi; diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSchemaRetriever.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSchemaRetriever.java new file mode 100644 index 0000000..7a0c79e --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSchemaRetriever.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.avro; + +import org.apache.avro.Schema; + +/** + * Retrieves the avro schema from the given source. "Source" is a little loose term here in the + * sense it can range from being an HDFS url location pointing to the schema or it can be even + * as simple as a {@link Properties properties} file with a simple key-value mapping to the schema. + * For cases where the {@link Schema schema} is a part of the serialized data itself, "Source" + * would refer to the data bytes from which the {@link Schema schema} has to retrieved. + * + * */ +public abstract class AvroSchemaRetriever { + + /** + * Retrieve the writer avro schema from the given source + * + * @param source + * source from which the schema has to retrieved + * @return the retrieved writer {@link Schema} + * */ + public abstract Schema retrieveWriterSchema(Object source); + + /** + * Retrieve the reader avro schema from the given source + * + * @param source + * source from which the schema has to retrieved + * @return the retrieved reader {@link Schema} + * */ + public Schema retrieveReaderSchema(Object source){ + return null; + } + + /** + * Possible offset associated with schema. This is useful when the schema is stored inline along + * with the data. + * + *

+ * Defaulted to zero. Consumers can choose to override this value to provide a custom offset. + *

+ * */ + public int getOffset() { + return 0; + } +} \ No newline at end of file diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 9d58d13..c11591c 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -44,6 +44,7 @@ public static final String SCHEMA_LITERAL = "avro.schema.literal"; public static final String SCHEMA_URL = "avro.schema.url"; + public static final String SCHEMA_RETRIEVER = "avro.schema.retriever"; public static final String SCHEMA_NONE = "none"; public static final String EXCEPTION_MESSAGE = "Neither " + SCHEMA_LITERAL + " nor " + SCHEMA_URL + " specified, can't determine table schema"; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java index 66f79ed..d3b00b8 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyListObjectInspector; @@ -53,6 +54,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; @@ -218,6 +220,32 @@ public static ObjectInspector createLazyObjectInspector(TypeInfo typeInfo, byte[] separator, int separatorIndex, Text nullSequence, boolean escaped, byte escapeChar) throws SerDeException { + return createLazyObjectInspector(typeInfo, separator, separatorIndex, nullSequence, + escaped, escapeChar, null); + } + + /** + * Create a hierarchical ObjectInspector for LazyObject with the given + * typeInfo. + * + * @param typeInfo + * The type information for the LazyObject + * @param separator + * The array of separators for delimiting each level + * @param separatorIndex + * The current level (for separators). List(array), struct uses 1 + * level of separator, and map uses 2 levels: the first one for + * delimiting entries, the second one for delimiting key and values. + * @param nullSequence + * The sequence of bytes representing NULL. + * @param option + * {@link ObjectInspectorOptions option} for the object inspector + * @return The ObjectInspector + * @throws SerDeException + */ + public static ObjectInspector createLazyObjectInspector(TypeInfo typeInfo, + byte[] separator, int separatorIndex, Text nullSequence, boolean escaped, + byte escapeChar, ObjectInspectorOptions option) throws SerDeException { ObjectInspector.Category c = typeInfo.getCategory(); switch (c) { case PRIMITIVE: @@ -227,9 +255,9 @@ public static ObjectInspector createLazyObjectInspector(TypeInfo typeInfo, return LazyObjectInspectorFactory.getLazySimpleMapObjectInspector( createLazyObjectInspector(((MapTypeInfo) typeInfo) .getMapKeyTypeInfo(), separator, separatorIndex + 2, - nullSequence, escaped, escapeChar), createLazyObjectInspector( + nullSequence, escaped, escapeChar, option), createLazyObjectInspector( ((MapTypeInfo) typeInfo).getMapValueTypeInfo(), separator, - separatorIndex + 2, nullSequence, escaped, escapeChar), + separatorIndex + 2, nullSequence, escaped, escapeChar, option), LazyUtils.getSeparator(separator, separatorIndex), LazyUtils.getSeparator(separator, separatorIndex+1), nullSequence, escaped, escapeChar); @@ -237,7 +265,7 @@ public static ObjectInspector createLazyObjectInspector(TypeInfo typeInfo, return LazyObjectInspectorFactory.getLazySimpleListObjectInspector( createLazyObjectInspector(((ListTypeInfo) typeInfo) .getListElementTypeInfo(), separator, separatorIndex + 1, - nullSequence, escaped, escapeChar), LazyUtils.getSeparator(separator, separatorIndex), + nullSequence, escaped, escapeChar, option), LazyUtils.getSeparator(separator, separatorIndex), nullSequence, escaped, escapeChar); case STRUCT: StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; @@ -249,19 +277,19 @@ public static ObjectInspector createLazyObjectInspector(TypeInfo typeInfo, for (int i = 0; i < fieldTypeInfos.size(); i++) { fieldObjectInspectors.add(createLazyObjectInspector(fieldTypeInfos .get(i), separator, separatorIndex + 1, nullSequence, escaped, - escapeChar)); + escapeChar, option)); } return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector( fieldNames, fieldObjectInspectors, LazyUtils.getSeparator(separator, separatorIndex), - nullSequence, false, escaped, escapeChar); + nullSequence, false, escaped, escapeChar, option); case UNION: UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; List lazyOIs = new ArrayList(); for (TypeInfo uti : unionTypeInfo.getAllUnionObjectTypeInfos()) { lazyOIs.add(createLazyObjectInspector(uti, separator, separatorIndex + 1, nullSequence, escaped, - escapeChar)); + escapeChar, option)); } return LazyObjectInspectorFactory.getLazyUnionObjectInspector(lazyOIs, LazyUtils.getSeparator(separator, separatorIndex), @@ -286,11 +314,51 @@ public static ObjectInspector createLazyStructInspector( List columnNames, List typeInfos, byte[] separators, Text nullSequence, boolean lastColumnTakesRest, boolean escaped, byte escapeChar) throws SerDeException { + return createLazyStructInspector(columnNames, typeInfos, separators, nullSequence, + lastColumnTakesRest, escaped, escapeChar, null); + } + + /** + * Create a hierarchical ObjectInspector for LazyStruct with the given + * columnNames and columnTypeInfos. + * + * @param lastColumnTakesRest + * whether the last column of the struct should take the rest of the + * row if there are extra fields. + * @param columnToDataType + * Maps a column name to its {@link ObjectInspectorOptions type}. The valid values are + * {@link ObjectInspectorOptions#JAVA} or {@link ObjectInspectorOptions#AVRO}. + * @throws SerDeException + * @see LazyFactory#createLazyObjectInspector(TypeInfo, byte[], int, Text, + * boolean, byte) + */ + public static ObjectInspector createLazyStructInspector( + List columnNames, List typeInfos, byte[] separators, + Text nullSequence, boolean lastColumnTakesRest, boolean escaped, + byte escapeChar, Map columnToDataType) throws SerDeException { ArrayList columnObjectInspectors = new ArrayList( typeInfos.size()); + + if (columnToDataType != null && columnToDataType.size() != columnNames.size()) { + throw new IllegalArgumentException( + "The size of column to data type map does not match the number of columns"); + } + for (int i = 0; i < typeInfos.size(); i++) { + + ObjectInspectorOptions option = ObjectInspectorOptions.JAVA; + + if (columnToDataType != null) { + option = columnToDataType.get(columnNames.get(i)); + + if (option == null) { + throw new IllegalArgumentException("The data type for column [" + columnNames.get(i) + + "] was null"); + } + } + columnObjectInspectors.add(LazyFactory.createLazyObjectInspector( - typeInfos.get(i), separators, 1, nullSequence, escaped, escapeChar)); + typeInfos.get(i), separators, 1, nullSequence, escaped, escapeChar, option)); } return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector( columnNames, columnObjectInspectors, separators[0], nullSequence, diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java index 8a1ea46..1c45753 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java @@ -275,4 +275,13 @@ protected void setFieldInited(boolean[] fieldInited) { public long getRawDataSerializedSize() { return serializedSize; } + + /** + * Return the data in bytes corresponding to this given struct. This is useful specifically in + * cases where the data is stored in serialized formats like protobufs or thrift and would need + * custom deserializers to be deserialized. + * */ + public byte[] getBytes() { + return bytes.getData(); + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUnion.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUnion.java index 9f6bc3f..6a161adf 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUnion.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUnion.java @@ -41,7 +41,7 @@ /** * The object of the union. */ - private LazyObject field; + private Object field; /** * Tag of the Union @@ -54,6 +54,16 @@ private boolean fieldInited = false; /** + * Whether the tag has been set or not + * */ + private boolean tagSet = false; + + /** + * Whether the field has been set or not + * */ + private boolean fieldSet = false; + + /** * Construct a LazyUnion object with the ObjectInspector. */ public LazyUnion(LazyUnionObjectInspector oi) { @@ -123,6 +133,7 @@ private void parse() { * * @return The value of the field */ + @SuppressWarnings("rawtypes") private Object uncheckedGetField() { Text nullSequence = oi.getNullSequence(); int fieldLength = start + length - startPosition; @@ -134,9 +145,9 @@ private Object uncheckedGetField() { if (!fieldInited) { fieldInited = true; - field.init(bytes, startPosition, fieldLength); + ((LazyObject)field).init(bytes, startPosition, fieldLength); } - return field.getObject(); + return ((LazyObject)field).getObject(); } /** @@ -145,6 +156,10 @@ private Object uncheckedGetField() { * @return The field as a LazyObject */ public Object getField() { + if (fieldSet){ + return field; + } + if (!parsed) { parse(); } @@ -157,9 +172,33 @@ public Object getField() { * @return The tag byte */ public byte getTag() { + if (tagSet){ + return tag; + } + if (!parsed) { parse(); } return tag; } + + /** + * Set the field of the union + * + * @param field the field to be set + * */ + public void setField(Object field){ + this.field = field; + fieldSet = true; + } + + /** + * Set the tag for the union + * + * @param tag the tag to be set + * */ + public void setTag(byte tag){ + this.tag = tag; + tagSet = true; + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java index ed670b0..d829d52 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hive.serde2.avro.AvroLazyObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; import org.apache.hadoop.io.Text; /** @@ -42,20 +44,40 @@ new ConcurrentHashMap, LazySimpleStructObjectInspector>(); public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector( + List structFieldNames, + List structFieldObjectInspectors, byte separator, + Text nullSequence, boolean lastColumnTakesRest, boolean escaped, + byte escapeChar){ + return getLazySimpleStructObjectInspector(structFieldNames, + structFieldObjectInspectors, separator, nullSequence, + lastColumnTakesRest, escaped, escapeChar, ObjectInspectorOptions.JAVA); + } + + public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector( List structFieldNames, List structFieldObjectInspectors, byte separator, Text nullSequence, boolean lastColumnTakesRest, boolean escaped, - byte escapeChar) { + byte escapeChar, ObjectInspectorOptions option) { return getLazySimpleStructObjectInspector(structFieldNames, structFieldObjectInspectors, null, separator, nullSequence, - lastColumnTakesRest, escaped, escapeChar); + lastColumnTakesRest, escaped, escapeChar, option); + } + + public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector( + List structFieldNames, + List structFieldObjectInspectors, List structFieldComments, + byte separator, Text nullSequence, boolean lastColumnTakesRest, + boolean escaped, byte escapeChar) { + return getLazySimpleStructObjectInspector(structFieldNames, + structFieldObjectInspectors, structFieldComments, separator, nullSequence, + lastColumnTakesRest, escaped, escapeChar, ObjectInspectorOptions.JAVA); } public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector( List structFieldNames, List structFieldObjectInspectors, List structFieldComments, byte separator, Text nullSequence, boolean lastColumnTakesRest, - boolean escaped,byte escapeChar) { + boolean escaped,byte escapeChar, ObjectInspectorOptions option) { ArrayList signature = new ArrayList(); signature.add(structFieldNames); signature.add(structFieldObjectInspectors); @@ -64,15 +86,28 @@ public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector signature.add(Boolean.valueOf(lastColumnTakesRest)); signature.add(Boolean.valueOf(escaped)); signature.add(Byte.valueOf(escapeChar)); + signature.add(option); if(structFieldComments != null) { signature.add(structFieldComments); } LazySimpleStructObjectInspector result = cachedLazySimpleStructObjectInspector .get(signature); if (result == null) { - result = new LazySimpleStructObjectInspector(structFieldNames, - structFieldObjectInspectors, structFieldComments, separator, - nullSequence, lastColumnTakesRest, escaped, escapeChar); + switch (option) { + case JAVA: + result = new LazySimpleStructObjectInspector(structFieldNames, + structFieldObjectInspectors, structFieldComments, separator, + nullSequence, lastColumnTakesRest, escaped, escapeChar); + break; + case AVRO: + result = new AvroLazyObjectInspector(structFieldNames, + structFieldObjectInspectors, structFieldComments, separator, + nullSequence, lastColumnTakesRest, escaped, escapeChar); + break; + default: + throw new IllegalArgumentException("Illegal ObjectInspector type [" + option + "]"); + } + cachedLazySimpleStructObjectInspector.put(signature, result); } return result; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java index 8a5386a..801b49a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java @@ -23,7 +23,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.avro.AvroLazyObjectInspector; import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -88,7 +90,7 @@ public String toString() { } } - private List fields; + protected List fields; private byte separator; private Text nullSequence; private boolean lastColumnTakesRest; @@ -190,6 +192,23 @@ public Object getStructFieldData(Object data, StructField fieldRef) { int fieldID = f.getFieldID(); assert (fieldID >= 0 && fieldID < fields.size()); + ObjectInspector oi = f.getFieldObjectInspector(); + + if (oi instanceof AvroLazyObjectInspector) { + return ((AvroLazyObjectInspector) oi).getStructFieldData(data, + fieldRef); + } + + if (oi instanceof MapObjectInspector) { + ObjectInspector valueOI = ((MapObjectInspector) oi) + .getMapValueObjectInspector(); + + if (valueOI instanceof AvroLazyObjectInspector) { + return ((AvroLazyObjectInspector) valueOI).getStructFieldData(data, + fieldRef); + } + } + return struct.getField(fieldID); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java index 9a226b3..eded091 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java @@ -57,7 +57,7 @@ * for the same Java type. */ public enum ObjectInspectorOptions { - JAVA, THRIFT, PROTOCOL_BUFFERS + JAVA, THRIFT, PROTOCOL_BUFFERS, AVRO }; private static ConcurrentHashMap objectInspectorCache = new ConcurrentHashMap();