diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java index 5731e45..12c5377 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.hbase; +import java.io.IOException; +import java.util.Properties; + import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; @@ -26,9 +29,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import java.io.IOException; -import java.util.Properties; - public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements HBaseKeyFactory { protected LazySimpleSerDe.SerDeParameters serdeParams; diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index ca2f40e..aedd843 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -53,6 +53,7 @@ public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; public static final String HBASE_COMPOSITE_KEY_TYPES = "hbase.composite.key.types"; public static final String HBASE_COMPOSITE_KEY_FACTORY = "hbase.composite.key.factory"; + public static final String HBASE_STRUCT_SERIALIZER_CLASS = "hbase.struct.serialization.class"; public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; @@ -98,7 +99,7 @@ public void initialize(Configuration conf, Properties tbl) cachedHBaseRow = new LazyHBaseRow( (LazySimpleStructObjectInspector) cachedObjectInspector, - serdeParams.getKeyIndex(), serdeParams.getKeyFactory()); + serdeParams.getKeyIndex(), serdeParams.getKeyFactory(), serdeParams.getValueFactories()); serializer = new HBaseRowSerializer(serdeParams); diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java index 25a9cfc..9f2f02f 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java @@ -41,6 +41,10 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.util.StringUtils; @@ -371,6 +375,19 @@ public static Schema getSchemaFromFS(String schemaFSUrl, Configuration conf) } /** + * Create the {@link LazyObjectBase lazy field} + * */ + public static LazyObjectBase createLazyField(ColumnMapping[] columnMappings, int fieldID, + ObjectInspector inspector) { + ColumnMapping colMap = columnMappings[fieldID]; + if (colMap.getQualifierName() == null && !colMap.isHbaseRowKey()) { + // a column family + return new LazyHBaseCellMap((LazyMapObjectInspector) inspector); + } + return LazyFactory.createLazyObject(inspector, colMap.getBinaryStorage().get(0)); + } + + /** * Auto-generates the key struct for composite keys * * @param compositeKeyParts map of composite key part name to its type. Usually this would be diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java index 8878eb5..9efa494 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.hbase.struct.AvroHBaseValueFactory; import org.apache.hadoop.hive.hbase.struct.DefaultHBaseValueFactory; import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory; +import org.apache.hadoop.hive.hbase.struct.StructHBaseValueFactory; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; @@ -204,11 +205,21 @@ private static HBaseKeyFactory createKeyFactory(Configuration job, Properties tb for (int i = 0; i < columnMappings.size(); i++) { String serType = getSerializationType(conf, tbl, columnMappings.getColumnsMapping()[i]); - if (serType != null && serType.equals(AVRO_SERIALIZATION_TYPE)) { + if (AVRO_SERIALIZATION_TYPE.equals(serType)) { Schema schema = getSchema(conf, tbl, columnMappings.getColumnsMapping()[i]); - valueFactories.add(new AvroHBaseValueFactory(schema)); + valueFactories.add(new AvroHBaseValueFactory(i, schema)); + } else if (STRUCT_SERIALIZATION_TYPE.equals(serType)) { + String structValueClassName = tbl.getProperty(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS); + + if (structValueClassName == null) { + throw new IllegalArgumentException(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS + + " must be set for hbase columns of type [" + STRUCT_SERIALIZATION_TYPE + "]"); + } + + Class structValueClass = job.getClassByName(structValueClassName); + valueFactories.add(new StructHBaseValueFactory(i, structValueClass)); } else { - valueFactories.add(new DefaultHBaseValueFactory()); + valueFactories.add(new DefaultHBaseValueFactory(i)); } } } catch (Exception e) { diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index 3e8b8fd..6ac8423 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -20,15 +20,15 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; -import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.lazy.LazyStruct; -import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -47,18 +47,21 @@ private final int iKey; private final HBaseKeyFactory keyFactory; + private final List valueFactories; public LazyHBaseRow(LazySimpleStructObjectInspector oi) { - this(oi, -1, null); + this(oi, -1, null, null); } /** * Construct a LazyHBaseRow object with the ObjectInspector. */ - public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory) { + public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory, + List valueFactories) { super(oi); this.iKey = iKey; this.keyFactory = keyFactory; + this.valueFactories = valueFactories; } /** @@ -76,13 +79,14 @@ protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) thro if (fieldID == iKey) { return keyFactory.createKey(fieldRef.getFieldObjectInspector()); } - ColumnMapping colMap = columnsMapping[fieldID]; - if (colMap.qualifierName == null && !colMap.hbaseRowKey) { - // a column family - return new LazyHBaseCellMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector()); + + if (valueFactories != null) { + return valueFactories.get(fieldID).createValueObject(fieldRef.getFieldObjectInspector()); } - return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(), - colMap.binaryStorage.get(0)); + + // fallback to default + return HBaseSerDeHelper.createLazyField(columnsMapping, fieldID, + fieldRef.getFieldObjectInspector()); } /** diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java index c341c0a..a2ba827 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java @@ -48,7 +48,8 @@ * * @param schema the associated {@link Schema schema} * */ - public AvroHBaseValueFactory(Schema schema) { + public AvroHBaseValueFactory(int fieldID, Schema schema) { + super(fieldID); this.schema = schema; } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java index ac2cb57..f7a425d 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java @@ -21,9 +21,12 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.hbase.ColumnMappings; +import org.apache.hadoop.hive.hbase.HBaseSerDeHelper; import org.apache.hadoop.hive.hbase.HBaseSerDeParameters; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -35,15 +38,23 @@ public class DefaultHBaseValueFactory implements HBaseValueFactory{ protected LazySimpleSerDe.SerDeParameters serdeParams; + protected ColumnMappings columnMappings; protected HBaseSerDeParameters hbaseParams; protected Properties properties; protected Configuration conf; + private int fieldID; + + public DefaultHBaseValueFactory(int fieldID) { + this.fieldID = fieldID; + } + @Override public void init(HBaseSerDeParameters hbaseParams, Configuration conf, Properties properties) throws SerDeException { this.hbaseParams = hbaseParams; this.serdeParams = hbaseParams.getSerdeParams(); + this.columnMappings = hbaseParams.getColumnMappings(); this.properties = properties; this.conf = conf; } @@ -55,6 +66,11 @@ public ObjectInspector createValueObjectInspector(TypeInfo type) 1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar()); } + @Override + public LazyObjectBase createValueObject(ObjectInspector inspector) throws SerDeException { + return HBaseSerDeHelper.createLazyField(columnMappings.getColumnsMapping(), fieldID, inspector); + } + @Override public byte[] serializeValue(Object object, StructField field) throws IOException { diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java new file mode 100644 index 0000000..8fba79b --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.hbase.struct; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +/** + * This is an extension of LazyStruct. All value structs should extend this class and override the + * {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a value in the + * value structure. + *

+ * For example, for a value structure "/part1/part2/part3", part1 will have an id + * 0, part2 will have an id 1 and part3 will have an id 2. Custom + * implementations of getField(fieldID) should return the value corresponding to that fieldID. So, + * for the above example, the value returned for getField(0) should be part1, + * getField(1) should be part2 and getField(2) should be part3. + *

+ *

+ * All implementation are expected to have a constructor of the form
+ * + *

+ * MyCustomStructObject(LazySimpleStructObjectInspector oi, Properties props, Configuration conf, ColumnMapping colMap)
+ * 
+ * + *

+ * */ +public class HBaseStructValue extends LazyStruct { + + /** + * The column family name + */ + protected String familyName; + + /** + * The column qualifier name + */ + protected String qualifierName; + + public HBaseStructValue(LazySimpleStructObjectInspector oi) { + super(oi); + } + + /** + * Set the row data for this LazyStruct. + * + * @see LazyObject#init(ByteArrayRef, int, int) + * + * @param familyName The column family name + * @param qualifierName The column qualifier name + */ + public void init(ByteArrayRef bytes, int start, int length, String familyName, + String qualifierName) { + init(bytes, start, length); + this.familyName = familyName; + this.qualifierName = qualifierName; + } + + @Override + public ArrayList getFieldsAsList() { + ArrayList allFields = new ArrayList(); + + List fields = oi.getAllStructFieldRefs(); + + for (int i = 0; i < fields.size(); i++) { + allFields.add(getField(i)); + } + + return allFields; + } + + /** + * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID. + * + * @param fieldID field for which the object is to be created + * @param bytes value with which the object is to be initialized with + * @return initialized {@link LazyObject} + * */ + public LazyObject toLazyObject(int fieldID, byte[] bytes) { + ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector(); + + LazyObject lazyObject = LazyFactory.createLazyObject(fieldOI); + + ByteArrayRef ref = new ByteArrayRef(); + + ref.setData(bytes); + + // initialize the lazy object + lazyObject.init(ref, 0, ref.getData().length); + + return lazyObject; + } +} \ No newline at end of file diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java index 8722af0..3fead1e 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java @@ -22,8 +22,10 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.hbase.HBaseKeyFactory; import org.apache.hadoop.hive.hbase.HBaseSerDeParameters; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -53,6 +55,13 @@ void init(HBaseSerDeParameters hbaseParam, Configuration conf, Properties proper ObjectInspector createValueObjectInspector(TypeInfo type) throws SerDeException; /** + * create custom object for hbase value + * + * @param inspector OI create by {@link HBaseKeyFactory#createKeyObjectInspector} + */ + LazyObjectBase createValueObject(ObjectInspector inspector) throws SerDeException; + + /** * Serialize the given hive object * * @param object the object to be serialized diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java new file mode 100644 index 0000000..e467787 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.hbase.struct; + +import java.lang.reflect.Constructor; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * Implementation of {@link HBaseValueFactory} to consume a custom struct + * */ +public class StructHBaseValueFactory extends DefaultHBaseValueFactory { + + private final int fieldID; + private final Constructor constructor; + + public StructHBaseValueFactory(int fieldID, Class structValueClass) throws Exception { + super(fieldID); + this.fieldID = fieldID; + this.constructor = + structValueClass.getDeclaredConstructor(LazySimpleStructObjectInspector.class, + Properties.class, Configuration.class, ColumnMapping.class); + } + + @Override + public LazyObjectBase createValueObject(ObjectInspector inspector) throws SerDeException { + try { + return (T) constructor.newInstance(inspector, properties, hbaseParams.getBaseConfiguration(), + hbaseParams.getColumnMappings().getColumnsMapping()[fieldID]); + } catch (Exception e) { + throw new SerDeException(e); + } + } +} \ No newline at end of file diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java new file mode 100644 index 0000000..73d1903 --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java @@ -0,0 +1,75 @@ +package org.apache.hadoop.hive.hbase; + +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.hbase.struct.HBaseStructValue; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * Test specific implementation of {@link org.apache.hadoop.hive.serde2.lazy.LazyStruct} + */ +public class HBaseTestStructSerializer extends HBaseStructValue { + + protected byte[] bytes; + protected String bytesAsString; + protected Properties tbl; + protected Configuration conf; + protected ColumnMapping colMapping; + protected String testValue; + + public HBaseTestStructSerializer(LazySimpleStructObjectInspector oi, Properties tbl, + Configuration conf, ColumnMapping colMapping) { + super(oi); + this.tbl = tbl; + this.conf = conf; + this.colMapping = colMapping; + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + this.bytes = bytes.getData(); + } + + @Override + public Object getField(int fieldID) { + if (bytesAsString == null) { + bytesAsString = Bytes.toString(bytes).trim(); + } + + // Randomly pick the character corresponding to the field id and convert it to byte array + byte[] fieldBytes = new byte[] { (byte) bytesAsString.charAt(fieldID) }; + + return toLazyObject(fieldID, fieldBytes); + } + + /** + * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID. + * + * @param fieldID field for which the object is to be created + * @param bytes value with which the object is to be initialized with + * + * @return initialized {@link LazyObject} + * */ + @Override + public LazyObject toLazyObject(int fieldID, byte[] bytes) { + ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector(); + + LazyObject lazyObject = LazyFactory.createLazyObject(fieldOI); + + ByteArrayRef ref = new ByteArrayRef(); + + ref.setData(bytes); + + // initialize the lazy object + lazyObject.init(ref, 0, ref.getData().length); + + return lazyObject; + } +} \ No newline at end of file diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index 241818a..42b2444 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; +import junit.framework.Assert; import junit.framework.TestCase; import org.apache.avro.Schema; @@ -61,6 +62,7 @@ import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BooleanWritable; @@ -135,6 +137,27 @@ " ]\n" + "}"; + private static final String EXPECTED_DESERIALIZED_AVRO_STRING = + "{\"key\":\"test-row1\",\"cola_avro\":{\"arecord\":{\"int1\":42,\"boolean1\":true," + + "\"long1\":42432234234}}}"; + + private static final String EXPECTED_DESERIALIZED_AVRO_STRING_2 = + "{\"key\":\"test-row1\"," + + "\"cola_avro\":{\"employeename\":\"Avro Employee1\"," + + "\"employeeid\":11111,\"age\":25,\"gender\":\"FEMALE\"," + + "\"contactinfo\":{\"address\":[{\"address1\":\"Avro First Address1\",\"address2\":" + + "\"Avro Second Address1\",\"city\":\"Avro City1\",\"zipcode\":123456,\"county\":" + + "{0:{\"areacode\":999,\"number\":1234567890}},\"aliases\":null,\"metadata\":" + + "{\"testkey\":\"testvalue\"}},{\"address1\":\"Avro First Address1\",\"address2\":" + + "\"Avro Second Address1\",\"city\":\"Avro City1\",\"zipcode\":123456,\"county\":" + + "{0:{\"areacode\":999,\"number\":1234567890}},\"aliases\":null,\"metadata\":" + + "{\"testkey\":\"testvalue\"}}],\"homephone\":{\"areacode\":999,\"number\":1234567890}," + + "\"officephone\":{\"areacode\":999,\"number\":1234455555}}}}"; + + private static final String EXPECTED_DESERIALIZED_AVRO_STRING_3 = + "{\"key\":\"test-row1\",\"cola_avro\":{\"arecord\":{\"int1\":42,\"string1\":\"test\"," + + "\"boolean1\":true,\"long1\":42432234234}}}"; + /** * Test the default behavior of the Lazy family of objects and object inspectors. */ @@ -1047,7 +1070,8 @@ public void testHBaseSerDeWithAvroSchemaInline() throws SerDeException, IOExcept Properties tbl = createPropertiesForHiveAvroSchemaInline(); serDe.initialize(conf, tbl); - deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData, + EXPECTED_DESERIALIZED_AVRO_STRING); } private Properties createPropertiesForHiveAvroSchemaInline() { @@ -1092,7 +1116,8 @@ public void testHBaseSerDeWithForwardEvolvedSchema() throws SerDeException, IOEx Properties tbl = createPropertiesForHiveAvroForwardEvolvedSchema(); serDe.initialize(conf, tbl); - deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData, + EXPECTED_DESERIALIZED_AVRO_STRING_3); } private Properties createPropertiesForHiveAvroForwardEvolvedSchema() { @@ -1136,7 +1161,8 @@ public void testHBaseSerDeWithBackwardEvolvedSchema() throws SerDeException, IOE Properties tbl = createPropertiesForHiveAvroBackwardEvolvedSchema(); serDe.initialize(conf, tbl); - deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData, + EXPECTED_DESERIALIZED_AVRO_STRING); } private Properties createPropertiesForHiveAvroBackwardEvolvedSchema() { @@ -1185,7 +1211,8 @@ public void testHBaseSerDeWithAvroSerClass() throws SerDeException, IOException Properties tbl = createPropertiesForHiveAvroSerClass(); serDe.initialize(conf, tbl); - deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData, + EXPECTED_DESERIALIZED_AVRO_STRING_2); } private Properties createPropertiesForHiveAvroSerClass() { @@ -1243,7 +1270,8 @@ public void testHBaseSerDeWithAvroSchemaUrl() throws SerDeException, IOException Properties tbl = createPropertiesForHiveAvroSchemaUrl(onHDFS); serDe.initialize(conf, tbl); - deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData, + EXPECTED_DESERIALIZED_AVRO_STRING); } finally { // Teardown the cluster if (miniDfs != null) { @@ -1298,7 +1326,8 @@ public void testHBaseSerDeWithAvroExternalSchema() throws SerDeException, IOExce Properties tbl = createPropertiesForHiveAvroExternalSchema(); serDe.initialize(conf, tbl); - deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData); + deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData, + EXPECTED_DESERIALIZED_AVRO_STRING_2); } private Properties createPropertiesForHiveAvroExternalSchema() { @@ -1389,8 +1418,87 @@ private Properties createPropertiesForHiveAvroColumnFamilyMap() { return tbl; } + public void testHBaseSerDeCustomStructValue() throws IOException, SerDeException { + + byte[] cfa = "cola".getBytes(); + byte[] qualStruct = "struct".getBytes(); + + TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0); + byte[] key = testStruct.getBytes(); + // Data + List kvs = new ArrayList(); + + byte[] testData = testStruct.getBytes(); + kvs.add(new KeyValue(key, cfa, qualStruct, testData)); + + Result r = new Result(kvs); + byte[] putKey = testStruct.getBytesWithDelimiters(); + + Put p = new Put(putKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(putKey, cfa, qualStruct, Bytes.padTail(testData, 2))); + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForValueStruct(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHBaseValueStruct(serDe, r, p); + + } + + private Properties createPropertiesForValueStruct() { + Properties tbl = new Properties(); + tbl.setProperty("cola.struct.serialization.type", "struct"); + tbl.setProperty("cola.struct.test.value", "test value"); + tbl.setProperty(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS, + "org.apache.hadoop.hive.hbase.HBaseTestStructSerializer"); + tbl.setProperty(serdeConstants.LIST_COLUMNS, "key,astring"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "struct,struct"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:struct"); + tbl.setProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS, + "org.apache.hadoop.hive.hbase.HBaseTestCompositeKey"); + return tbl; + } + + private void deserializeAndSerializeHBaseValueStruct(HBaseSerDe serDe, Result r, Put p) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(new ResultWritable(r)); + + Object fieldData = null; + for (int j = 0; j < fieldRefs.size(); j++) { + fieldData = soi.getStructFieldData(row, fieldRefs.get(j)); + assertNotNull(fieldData); + if (fieldData instanceof LazyStruct) { + assertEquals(((LazyStruct) fieldData).getField(0).toString(), "A"); + assertEquals(((LazyStruct) fieldData).getField(1).toString(), "B"); + assertEquals(((LazyStruct) fieldData).getField(2).toString(), "C"); + } else { + Assert.fail("fieldData should be an instance of LazyStruct"); + } + } + + assertEquals( + "{\"key\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"},\"astring\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"}}", + SerDeUtils.getJSONString(row, soi)); + + // Now serialize + Put put = ((PutWritable) serDe.serialize(row, soi)).getPut(); + + assertEquals("Serialized put:", p.toString(), put.toString()); + } + private void deserializeAndSerializeHiveAvro(HBaseSerDe serDe, Result r, Put p, - Object[] expectedFieldsData) + Object[] expectedFieldsData, String expectedDeserializedAvroString) throws SerDeException, IOException { StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); @@ -1403,6 +1511,8 @@ private void deserializeAndSerializeHiveAvro(HBaseSerDe serDe, Result r, Put p, assertNotNull(fieldData); assertEquals(expectedFieldsData[j], fieldData.toString().trim()); } + + assertEquals(expectedDeserializedAvroString, SerDeUtils.getJSONString(row, soi)); // Now serialize Put put = ((PutWritable) serDe.serialize(row, soi)).getPut();