diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java new file mode 100644 index 0000000..cfa3826 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -0,0 +1,69 @@ +package org.apache.hadoop.hive.hbase; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +/** + * HBaseCompositeKey extension of LazyStruct. All complex composite keys should extend this class + * and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a + * key in the composite key. + *

+ * For example, for a composite key "/part1/part2/part3", part1 will have an id + * 0, part2 will have an id 1 and part3 will have an id 2. Custom + * implementations of getField(fieldID) should return the value corresponding to that fieldID. So, + * for the above example, the value returned for getField(0) should be part1, + * getField(1) should be part2 and getField(2) should be part3. + *

+ * */ +public class HBaseCompositeKey extends LazyStruct { + + public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { + super(oi); + } + + @Override + public ArrayList getFieldsAsList() { + ArrayList allFields = new ArrayList(); + + List fields = oi.getAllStructFieldRefs(); + + for (int i = 0; i < fields.size(); i++) { + allFields.add(getField(i)); + } + + return allFields; + } + + /** + * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID. + * + * @param fieldID + * field for which the object is to be created + * @param bytes + * value with which the object is to be initialized with + * @return initialized {@link LazyObject} + * */ + public LazyObject toLazyObject(int fieldID, byte[] bytes) { + ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector(); + + LazyObject lazyObject = LazyFactory + .createLazyObject(fieldOI); + + ByteArrayRef ref = new ByteArrayRef(); + + ref.setData(bytes); + + // initialize the lazy object + lazyObject.init(ref, 0, ref.getData().length); + + return lazyObject; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 7f37ba5..9ab07d7 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -65,6 +66,7 @@ public class HBaseSerDe implements SerDe { public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type"; public static final String HBASE_KEY_COL = ":key"; public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp"; + public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; public static final Log LOG = LogFactory.getLog(HBaseSerDe.class); private ObjectInspector cachedObjectInspector; @@ -76,6 +78,8 @@ public class HBaseSerDe implements SerDe { private final ByteStream.Output serializeStream = new ByteStream.Output(); private int iKey; private long putTimestamp; + private Class compositeKeyClass; + private Object compositeKeyObj; // used for serializing a field private byte [] separators; // the separators array @@ -122,6 +126,11 @@ public class HBaseSerDe implements SerDe { cachedHBaseRow = new LazyHBaseRow( (LazySimpleStructObjectInspector) cachedObjectInspector); + if (compositeKeyClass != null) { + // initialize the constructor of the composite key class with its object inspector + initCompositeKeyClass(tbl); + } + if (LOG.isDebugEnabled()) { LOG.debug("HBaseSerDe initialized with : columnNames = " + serdeParams.getColumnNames() @@ -419,6 +428,16 @@ public class HBaseSerDe implements SerDe { String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1")); + String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + + if (compKeyClass != null) { + try { + compositeKeyClass = job.getClassByName(compKeyClass); + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } + } + // Parse and initialize the HBase columns mapping columnsMapping = parseColumnsMapping(hbaseColumnsMapping); @@ -502,7 +521,7 @@ public class HBaseSerDe implements SerDe { throw new SerDeException(getClass().getName() + ": expects Result!"); } - cachedHBaseRow.init((Result) result, columnsMapping); + cachedHBaseRow.init((Result) result, columnsMapping, compositeKeyObj); return cachedHBaseRow; } @@ -546,10 +565,11 @@ public class HBaseSerDe implements SerDe { throw new SerDeException("HBase row key cannot be NULL"); } - if(putTimestamp >= 0) + if(putTimestamp >= 0) { put = new Put(key,putTimestamp); - else + } else { put = new Put(key); + } // Serialize each field for (int i = 0; i < fields.size(); i++) { @@ -766,6 +786,47 @@ public class HBaseSerDe implements SerDe { throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); } + /** + * Initialize the composite key class with the objectinspector for the key + * + * @throws SerDeException + * */ + private void initCompositeKeyClass(Properties tbl) throws SerDeException { + + int i = 0; + + // find the hbase row key + for (ColumnMapping colMap : columnsMapping) { + if (colMap.hbaseRowKey) { + break; + } + i++; + } + + ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector) + .getAllStructFieldRefs().get(i).getFieldObjectInspector(); + + try { + compositeKeyObj = compositeKeyClass.getDeclaredConstructor( + LazySimpleStructObjectInspector.class, Properties.class) + .newInstance( + ((LazySimpleStructObjectInspector) keyObjectInspector), tbl); + } catch (IllegalArgumentException e) { + throw new SerDeException(e); + } catch (SecurityException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InvocationTargetException e) { + throw new SerDeException(e); + } catch (NoSuchMethodException e) { + // the constructor wasn't defined in the implementation class. Flag error + throw new SerDeException("Constructor not defined in composite key class [" + + compositeKeyClass.getName() + "]", e); + } + } /** * @return the useJSONSerialize diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index d35bb52..eff7f7d 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -45,6 +45,7 @@ public class LazyHBaseRow extends LazyStruct { */ private Result result; private List columnsMapping; + private Object compositeKeyObj; private ArrayList cachedList; /** @@ -59,9 +60,22 @@ public class LazyHBaseRow extends LazyStruct { * @see LazyHBaseRow#init(Result) */ public void init(Result r, List columnsMapping) { + init(r, columnsMapping, null); + } + + /** + * Set the HBase row data(a Result writable) for this LazyStruct. + * + * @see LazyHBaseRow#init(Result) + * + * @param compositeKeyClass + * custom implementation to interpret the composite key + */ + public void init(Result r, List columnsMapping, Object compositeKeyObj) { result = r; this.columnsMapping = columnsMapping; + this.compositeKeyObj = compositeKeyObj; setParsed(false); } @@ -117,7 +131,13 @@ public class LazyHBaseRow extends LazyStruct { parse(); } - return uncheckedGetField(fieldID); + Object value = uncheckedGetField(fieldID); + + if (columnsMapping.get(fieldID).hbaseRowKey && compositeKeyObj != null) { + return compositeKeyObj; + } else { + return value; + } } /** @@ -161,6 +181,12 @@ public class LazyHBaseRow extends LazyStruct { if (ref != null) { fields[fieldID].init(ref, 0, ref.getData().length); + + // if it was a row key and we have been provided a custom composite key class, initialize it + // with the bytes for the row key + if (colMap.hbaseRowKey && compositeKeyObj != null) { + ((LazyStruct) compositeKeyObj).init(ref, 0, ref.getData().length); + } } } diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java new file mode 100644 index 0000000..e95bd0e --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.hbase; + +import java.util.Properties; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; + +public class HBaseTestCompositeKey extends HBaseCompositeKey { + + byte[] bytes; + String bytesAsString; + Properties tbl; + + public HBaseTestCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl) { + super(oi); + this.tbl = tbl; + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + this.bytes = bytes.getData(); + } + + @Override + public Object getField(int fieldID) { + if (bytesAsString == null) { + bytesAsString = Bytes.toString(bytes).trim(); + } + + // Randomly pick the character corresponding to the field id and convert it to byte array + byte[] fieldBytes = new byte[] {(byte) bytesAsString.charAt(fieldID)}; + + return toLazyObject(fieldID, fieldBytes); + } +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index e821282..8e2eeaa 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.hbase; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; @@ -43,6 +46,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; /** * Tests the HBaseSerDe class. @@ -688,4 +692,172 @@ public class TestHBaseSerDe extends TestCase { Put serializedPut = (Put) hbaseSerDe.serialize(row, soi); assertEquals("Serialized data: ", p.toString(), serializedPut.toString()); } + + public void testHBaseSerDeCompositeKeyWithSeparator() throws SerDeException, TException, + IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualStruct = "struct".getBytes(); + + TestStruct testStruct = new TestStruct("A", "B", "C", true, (byte) 45); + + byte[] rowKey = testStruct.getBytes(); + + // Data + List kvs = new ArrayList(); + + byte[] testData = "This is a test data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForCompositeKeyWithSeparator(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHBaseCompositeKey(serDe, r, p); + } + + private Properties createPropertiesForCompositeKeyWithSeparator() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "struct,string"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:struct"); + tbl.setProperty(serdeConstants.COLLECTION_DELIM, "-"); + + return tbl; + } + + public void testHBaseSerDeCompositeKeyWithoutSeparator() throws SerDeException, TException, + IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualStruct = "struct".getBytes(); + + TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0); + + byte[] rowKey = testStruct.getBytes(); + + // Data + List kvs = new ArrayList(); + + byte[] testData = "This is a test data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + Result r = new Result(kvs); + + byte[] putRowKey = testStruct.getBytesWithDelimiters(); + + Put p = new Put(putRowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(putRowKey, cfa, qualStruct, testData)); + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForCompositeKeyWithoutSeparator(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHBaseCompositeKey(serDe, r, p); + } + + private Properties createPropertiesForCompositeKeyWithoutSeparator() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "struct,string"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:struct"); + tbl.setProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS, + "org.apache.hadoop.hive.hbase.HBaseTestCompositeKey"); + + return tbl; + } + + private void deserializeAndSerializeHBaseCompositeKey(HBaseSerDe serDe, Result r, Put p) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(r); + + for (int j = 0; j < fieldRefs.size(); j++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j)); + assertNotNull(fieldData); + } + + assertEquals( + "{\"key\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"},\"astring\":\"This is a test data\"}", + SerDeUtils.getJSONString(row, soi)); + + // Now serialize + Put put = (Put) serDe.serialize(row, soi); + + assertEquals("Serialized put:", p.toString(), put.toString()); + } + + class TestStruct { + String f1; + String f2; + String f3; + boolean hasSeparator; + byte separator; + + TestStruct(String f1, String f2, String f3, boolean hasSeparator, byte separator) { + this.f1 = f1; + this.f2 = f2; + this.f3 = f3; + this.hasSeparator = hasSeparator; + this.separator = separator; + } + + public byte[] getBytes() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + bos.write(f1.getBytes()); + if (hasSeparator) { + bos.write(separator); // Add field separator + } + bos.write(f2.getBytes()); + if (hasSeparator) { + bos.write(separator); + } + bos.write(f3.getBytes()); + + return bos.toByteArray(); + } + + public byte[] getBytesWithDelimiters() throws IOException { + // Add Ctrl-B delimiter between the fields. This is necessary because for structs in case no + // delimiter is provided, hive automatically adds Ctrl-B as a default delimiter between fields + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + bos.write(f1.getBytes()); + bos.write("\\x02".getBytes()); + bos.write(f2.getBytes()); + bos.write("\\x02".getBytes()); + bos.write(f3.getBytes()); + + return bos.toByteArray(); + } + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java index 08400f1..df7c8db 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java @@ -33,10 +33,10 @@ import org.apache.hadoop.io.Text; /** * LazySimpleStructObjectInspector works on struct data that is stored in * LazyStruct. - * + * * The names of the struct fields and the internal structure of the struct * fields are specified in the ctor of the LazySimpleStructObjectInspector. - * + * * Always use the ObjectInspectorFactory to create new ObjectInspector objects, * instead of directly creating an instance of this class. */ @@ -193,8 +193,15 @@ public class LazySimpleStructObjectInspector extends StructObjectInspector { if (data == null) { return null; } - LazyStruct struct = (LazyStruct) data; - return struct.getFieldsAsList(); + + // Iterate over all the fields picking up the nested structs within them + List result = new ArrayList(fields.size()); + + for (MyField myField : fields) { + result.add(getStructFieldData(data, myField)); + } + + return result; } // For LazyStruct