diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java new file mode 100644 index 0000000..cfa3826 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -0,0 +1,69 @@ +package org.apache.hadoop.hive.hbase; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +/** + * HBaseCompositeKey extension of LazyStruct. All complex composite keys should extend this class + * and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a + * key in the composite key. + *

+ * For example, for a composite key "/part1/part2/part3", part1 will have an id + * 0, part2 will have an id 1 and part3 will have an id 2. Custom + * implementations of getField(fieldID) should return the value corresponding to that fieldID. So, + * for the above example, the value returned for getField(0) should be part1, + * getField(1) should be part2 and getField(2) should be part3. + *

+ * */ +public class HBaseCompositeKey extends LazyStruct { + + public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { + super(oi); + } + + @Override + public ArrayList getFieldsAsList() { + ArrayList allFields = new ArrayList(); + + List fields = oi.getAllStructFieldRefs(); + + for (int i = 0; i < fields.size(); i++) { + allFields.add(getField(i)); + } + + return allFields; + } + + /** + * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID. + * + * @param fieldID + * field for which the object is to be created + * @param bytes + * value with which the object is to be initialized with + * @return initialized {@link LazyObject} + * */ + public LazyObject toLazyObject(int fieldID, byte[] bytes) { + ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector(); + + LazyObject lazyObject = LazyFactory + .createLazyObject(fieldOI); + + ByteArrayRef ref = new ByteArrayRef(); + + ref.setData(bytes); + + // initialize the lazy object + lazyObject.init(ref, 0, ref.getData().length); + + return lazyObject; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 4900a41..c6a816e 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -65,11 +66,12 @@ public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type"; public static final String HBASE_KEY_COL = ":key"; public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp"; + public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; - - /** Determines whether a regex matching should be done on the columns or not. Defaults to true. + + /** Determines whether a regex matching should be done on the columns or not. Defaults to true. * WARNING: Note that currently this only supports the suffix wildcard .* **/ public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching"; @@ -85,6 +87,8 @@ private final ByteStream.Output serializeStream = new ByteStream.Output(); private int iKey; private long putTimestamp; + private Class compositeKeyClass; + private Object compositeKeyObj; // used for serializing a field private byte [] separators; // the separators array @@ -131,6 +135,11 @@ public void initialize(Configuration conf, Properties tbl) cachedHBaseRow = new LazyHBaseRow( (LazySimpleStructObjectInspector) cachedObjectInspector); + if (compositeKeyClass != null) { + // initialize the constructor of the composite key class with its object inspector + initCompositeKeyClass(tbl); + } + if (LOG.isDebugEnabled()) { LOG.debug("HBaseSerDe initialized with : columnNames = " + serdeParams.getColumnNames() @@ -460,6 +469,16 @@ private void initHBaseSerDeParameters( doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true")); + String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + + if (compKeyClass != null) { + try { + compositeKeyClass = job.getClassByName(compKeyClass); + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } + } + // Parse and initialize the HBase columns mapping columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); @@ -543,7 +562,7 @@ public Object deserialize(Writable result) throws SerDeException { throw new SerDeException(getClass().getName() + ": expects Result!"); } - cachedHBaseRow.init((Result) result, columnsMapping); + cachedHBaseRow.init((Result) result, columnsMapping, compositeKeyObj); return cachedHBaseRow; } @@ -808,6 +827,47 @@ private boolean serialize( throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); } + /** + * Initialize the composite key class with the objectinspector for the key + * + * @throws SerDeException + * */ + private void initCompositeKeyClass(Properties tbl) throws SerDeException { + + int i = 0; + + // find the hbase row key + for (ColumnMapping colMap : columnsMapping) { + if (colMap.hbaseRowKey) { + break; + } + i++; + } + + ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector) + .getAllStructFieldRefs().get(i).getFieldObjectInspector(); + + try { + compositeKeyObj = compositeKeyClass.getDeclaredConstructor( + LazySimpleStructObjectInspector.class, Properties.class) + .newInstance( + ((LazySimpleStructObjectInspector) keyObjectInspector), tbl); + } catch (IllegalArgumentException e) { + throw new SerDeException(e); + } catch (SecurityException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InvocationTargetException e) { + throw new SerDeException(e); + } catch (NoSuchMethodException e) { + // the constructor wasn't defined in the implementation class. Flag error + throw new SerDeException("Constructor not defined in composite key class [" + + compositeKeyClass.getName() + "]", e); + } + } /** * @return the useJSONSerialize diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index b254b0d..fc40195 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -45,6 +45,7 @@ */ private Result result; private List columnsMapping; + private Object compositeKeyObj; private ArrayList cachedList; /** @@ -59,9 +60,22 @@ public LazyHBaseRow(LazySimpleStructObjectInspector oi) { * @see LazyHBaseRow#init(Result) */ public void init(Result r, List columnsMapping) { + init(r, columnsMapping, null); + } + + /** + * Set the HBase row data(a Result writable) for this LazyStruct. + * + * @see LazyHBaseRow#init(Result) + * + * @param compositeKeyClass + * custom implementation to interpret the composite key + */ + public void init(Result r, List columnsMapping, Object compositeKeyObj) { result = r; this.columnsMapping = columnsMapping; + this.compositeKeyObj = compositeKeyObj; setParsed(false); } @@ -117,7 +131,13 @@ public Object getField(int fieldID) { parse(); } - return uncheckedGetField(fieldID); + Object value = uncheckedGetField(fieldID); + + if (columnsMapping.get(fieldID).hbaseRowKey && compositeKeyObj != null) { + return compositeKeyObj; + } else { + return value; + } } /** @@ -162,6 +182,12 @@ private Object uncheckedGetField(int fieldID) { if (ref != null) { fields[fieldID].init(ref, 0, ref.getData().length); + + // if it was a row key and we have been provided a custom composite key class, initialize it + // with the bytes for the row key + if (colMap.hbaseRowKey && compositeKeyObj != null) { + ((LazyStruct) compositeKeyObj).init(ref, 0, ref.getData().length); + } } } diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java new file mode 100644 index 0000000..e95bd0e --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.hbase; + +import java.util.Properties; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; + +public class HBaseTestCompositeKey extends HBaseCompositeKey { + + byte[] bytes; + String bytesAsString; + Properties tbl; + + public HBaseTestCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl) { + super(oi); + this.tbl = tbl; + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + this.bytes = bytes.getData(); + } + + @Override + public Object getField(int fieldID) { + if (bytesAsString == null) { + bytesAsString = Bytes.toString(bytes).trim(); + } + + // Randomly pick the character corresponding to the field id and convert it to byte array + byte[] fieldBytes = new byte[] {(byte) bytesAsString.charAt(fieldID)}; + + return toLazyObject(fieldID, fieldBytes); + } +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index d25c731..c7589ad 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -45,6 +46,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; /** * Tests the HBaseSerDe class. @@ -809,4 +811,172 @@ private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Res assertEquals("Serialized put:", p.toString(), put.toString()); } } + + public void testHBaseSerDeCompositeKeyWithSeparator() throws SerDeException, TException, + IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualStruct = "struct".getBytes(); + + TestStruct testStruct = new TestStruct("A", "B", "C", true, (byte) 45); + + byte[] rowKey = testStruct.getBytes(); + + // Data + List kvs = new ArrayList(); + + byte[] testData = "This is a test data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForCompositeKeyWithSeparator(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHBaseCompositeKey(serDe, r, p); + } + + private Properties createPropertiesForCompositeKeyWithSeparator() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "struct,string"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:struct"); + tbl.setProperty(serdeConstants.COLLECTION_DELIM, "-"); + + return tbl; + } + + public void testHBaseSerDeCompositeKeyWithoutSeparator() throws SerDeException, TException, + IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualStruct = "struct".getBytes(); + + TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0); + + byte[] rowKey = testStruct.getBytes(); + + // Data + List kvs = new ArrayList(); + + byte[] testData = "This is a test data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + Result r = new Result(kvs); + + byte[] putRowKey = testStruct.getBytesWithDelimiters(); + + Put p = new Put(putRowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(putRowKey, cfa, qualStruct, testData)); + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForCompositeKeyWithoutSeparator(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHBaseCompositeKey(serDe, r, p); + } + + private Properties createPropertiesForCompositeKeyWithoutSeparator() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "struct,string"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:struct"); + tbl.setProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS, + "org.apache.hadoop.hive.hbase.HBaseTestCompositeKey"); + + return tbl; + } + + private void deserializeAndSerializeHBaseCompositeKey(HBaseSerDe serDe, Result r, Put p) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(r); + + for (int j = 0; j < fieldRefs.size(); j++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j)); + assertNotNull(fieldData); + } + + assertEquals( + "{\"key\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"},\"astring\":\"This is a test data\"}", + SerDeUtils.getJSONString(row, soi)); + + // Now serialize + Put put = (Put) serDe.serialize(row, soi); + + assertEquals("Serialized put:", p.toString(), put.toString()); + } + + class TestStruct { + String f1; + String f2; + String f3; + boolean hasSeparator; + byte separator; + + TestStruct(String f1, String f2, String f3, boolean hasSeparator, byte separator) { + this.f1 = f1; + this.f2 = f2; + this.f3 = f3; + this.hasSeparator = hasSeparator; + this.separator = separator; + } + + public byte[] getBytes() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + bos.write(f1.getBytes()); + if (hasSeparator) { + bos.write(separator); // Add field separator + } + bos.write(f2.getBytes()); + if (hasSeparator) { + bos.write(separator); + } + bos.write(f3.getBytes()); + + return bos.toByteArray(); + } + + public byte[] getBytesWithDelimiters() throws IOException { + // Add Ctrl-B delimiter between the fields. This is necessary because for structs in case no + // delimiter is provided, hive automatically adds Ctrl-B as a default delimiter between fields + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + bos.write(f1.getBytes()); + bos.write("\002".getBytes("UTF8")); + bos.write(f2.getBytes()); + bos.write("\002".getBytes("UTF8")); + bos.write(f3.getBytes()); + + return bos.toByteArray(); + } + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java index 08400f1..df7c8db 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java @@ -33,10 +33,10 @@ /** * LazySimpleStructObjectInspector works on struct data that is stored in * LazyStruct. - * + * * The names of the struct fields and the internal structure of the struct * fields are specified in the ctor of the LazySimpleStructObjectInspector. - * + * * Always use the ObjectInspectorFactory to create new ObjectInspector objects, * instead of directly creating an instance of this class. */ @@ -193,8 +193,15 @@ public Object getStructFieldData(Object data, StructField fieldRef) { if (data == null) { return null; } - LazyStruct struct = (LazyStruct) data; - return struct.getFieldsAsList(); + + // Iterate over all the fields picking up the nested structs within them + List result = new ArrayList(fields.size()); + + for (MyField myField : fields) { + result.add(getStructFieldData(data, myField)); + } + + return result; } // For LazyStruct